You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/03/18 00:27:15 UTC

[1/2] incubator-usergrid git commit: Merge branch 'two-dot-o' into USERGRID-405

Repository: incubator-usergrid
Updated Branches:
  refs/heads/USERGRID-405-merge [created] 762e43e1e


Merge branch 'two-dot-o' into USERGRID-405

Conflicts:
	stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
	stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
	stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
	stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
	stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
	stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
	stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
	stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
	stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java
	stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
	stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/util/EntityUtils.java
	stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
	stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
	stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java
	stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/7b5d2224
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/7b5d2224
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/7b5d2224

Branch: refs/heads/USERGRID-405-merge
Commit: 7b5d2224512f0043336acbd9b9323e1e32edf888
Parents: 5c7a5f8 b53cb07
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Mar 17 11:36:08 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Mar 17 11:36:08 2015 -0600

----------------------------------------------------------------------
 .../main/dist/init_instance/init_rest_server.sh |   2 +
 .../dist/init_instance/install_elasticsearch.sh |  22 +-
 .../main/groovy/configure_elasticsearch.groovy  |  80 ++--
 .../src/main/groovy/configure_usergrid.groovy   |  12 +-
 stack/awscluster/ugcluster-cf.json              | 466 +++++++++++++------
 stack/core/pom.xml                              |  10 -
 .../batch/service/JobSchedulerService.java      |   5 -
 .../usergrid/corepersistence/CoreModule.java    |  36 +-
 .../corepersistence/CpEntityManager.java        |  64 ++-
 .../corepersistence/CpEntityManagerFactory.java |   4 -
 .../corepersistence/CpRelationManager.java      |  14 -
 .../events/EntityDeletedHandler.java            |  58 ++-
 .../events/EntityVersionCreatedHandler.java     |  60 ++-
 .../events/EntityVersionDeletedHandler.java     |  53 ++-
 .../results/FilteringLoader.java                |   2 +-
 .../usergrid/persistence/EntityManager.java     |  10 +-
 .../persistence/EntityManagerFactory.java       |   2 -
 .../cassandra/EntityManagerFactoryImpl.java     |   3 -
 .../cassandra/EntityManagerImpl.java            |  31 +-
 .../cassandra/RelationManagerImpl.java          |  58 +--
 .../cassandra/index/ConnectedIndexScanner.java  |   2 -
 .../cassandra/index/IndexBucketScanner.java     |   2 -
 .../corepersistence/StaleIndexCleanupTest.java  |  38 +-
 .../usergrid/persistence/EntityManagerIT.java   |   1 +
 .../PerformanceEntityRebuildIndexTest.java      |   2 +
 .../collection/EntityCollectionManager.java     |  20 +-
 .../collection/EntityDeletedFactory.java        |  34 --
 .../collection/EntityVersionCleanupFactory.java |  35 --
 .../collection/EntityVersionCreatedFactory.java |  31 --
 .../persistence/collection/FieldSet.java        |  48 ++
 .../cache/CachedEntityCollectionManager.java    |  11 +-
 .../collection/guice/CollectionModule.java      |  11 +-
 .../EntityCollectionManagerFactoryImpl.java     |  32 +-
 .../impl/EntityCollectionManagerImpl.java       | 251 ++++++++--
 .../collection/impl/EntityDeletedTask.java      |   9 +-
 .../impl/EntityVersionCleanupTask.java          |  34 +-
 .../impl/EntityVersionTaskFactory.java          |  65 +++
 .../mvcc/stage/write/WriteUniqueVerify.java     | 140 +++---
 .../UniqueValueSerializationStrategy.java       |  12 +
 .../serialization/impl/MutableFieldSet.java     |  63 +++
 .../UniqueValueSerializationStrategyImpl.java   |  27 +-
 .../collection/util/EntityUtils.java            |   4 +-
 .../collection/EntityCollectionManagerIT.java   |  68 +++
 .../mvcc/stage/write/WriteUniqueVerifyTest.java |   6 +-
 .../core/astyanax/CassandraConfig.java          |   6 +
 .../core/astyanax/CassandraConfigImpl.java      |   8 +-
 .../persistence/core/astyanax/CassandraFig.java |   9 +-
 .../persistence/core/future/BetterFuture.java   |  43 +-
 .../core/metrics/MetricsFactory.java            |   9 +
 .../core/metrics/MetricsFactoryImpl.java        | 121 +++--
 .../core/astyanax/ColumnNameIteratorTest.java   |   7 +-
 .../MultiKeyColumnNameIteratorTest.java         |   7 +-
 .../astyanax/MultiRowColumnIteratorTest.java    |   7 +-
 stack/corepersistence/graph/pom.xml             |  28 +-
 .../graph/impl/GraphManagerImpl.java            | 288 ++++++++++--
 .../usergrid/persistence/map/MapManager.java    |  14 +-
 .../persistence/map/impl/MapManagerImpl.java    |   8 +
 .../persistence/map/impl/MapSerialization.java  |   9 +
 .../map/impl/MapSerializationImpl.java          | 130 +++++-
 .../persistence/map/MapManagerTest.java         |  49 +-
 stack/corepersistence/queryindex/pom.xml        |  17 +-
 .../usergrid/persistence/index/EntityIndex.java |  26 +-
 .../persistence/index/IndexBufferConsumer.java  |  11 +
 .../persistence/index/IndexBufferProducer.java  |   1 -
 .../usergrid/persistence/index/IndexFig.java    |  73 ++-
 .../index/IndexOperationMessage.java            | 115 ++++-
 .../persistence/index/guice/IndexModule.java    |   7 +
 .../persistence/index/guice/QueueProvider.java  | 116 +++++
 .../persistence/index/impl/BatchRequest.java    |  41 ++
 .../persistence/index/impl/BufferQueue.java     |  68 +++
 .../index/impl/BufferQueueInMemoryImpl.java     | 108 +++++
 .../index/impl/BufferQueueSQSImpl.java          | 307 ++++++++++++
 .../persistence/index/impl/DeIndexRequest.java  | 115 +++++
 .../index/impl/EsEntityIndexBatchImpl.java      |  50 +-
 .../index/impl/EsEntityIndexImpl.java           | 176 ++-----
 .../index/impl/EsIndexBufferConsumerImpl.java   | 286 ++++++++----
 .../index/impl/EsIndexBufferProducerImpl.java   |  16 +-
 .../persistence/index/impl/EsIndexCache.java    | 138 +++---
 .../persistence/index/impl/IndexRequest.java    | 125 +++++
 .../index/guice/TestIndexModule.java            |   3 +-
 .../index/impl/BufferQueueSQSImplTest.java      | 169 +++++++
 .../impl/EntityConnectionIndexImplTest.java     |   5 +-
 .../persistence/index/impl/EntityIndexTest.java |  20 +-
 .../persistence/index/impl/EsTestUtils.java     |  48 --
 .../persistence/queue/QueueManager.java         |   4 +-
 .../usergrid/persistence/queue/QueueScope.java  |   2 +-
 .../persistence/queue/QueueScopeFactory.java    |  34 --
 .../persistence/queue/guice/QueueModule.java    |  17 +-
 .../queue/impl/QueueScopeFactoryImpl.java       |  48 --
 .../persistence/queue/impl/QueueScopeImpl.java  |  27 +-
 .../queue/impl/SQSQueueManagerImpl.java         | 214 +++++----
 .../persistence/queue/NoAWSCredsRule.java       |  98 ++++
 .../persistence/queue/QueueManagerTest.java     |  29 +-
 stack/pom.xml                                   |  43 --
 stack/rest/pom.xml                              |   4 -
 .../org/apache/usergrid/rest/RootResource.java  |   6 -
 stack/services/pom.xml                          |  13 +-
 .../cassandra/ManagementServiceImpl.java        |   9 +-
 .../services/AbstractCollectionService.java     |  40 +-
 .../services/AbstractConnectionsService.java    |  39 +-
 .../notifications/NotificationsService.java     |  50 +-
 .../services/notifications/QueueListener.java   |   5 +-
 .../usergrid/services/queues/QueueListener.java |   5 +-
 .../notifications/NotifiersServiceIT.java       |   6 +
 stack/test-utils/pom.xml                        |   5 -
 105 files changed, 3849 insertions(+), 1571 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/awscluster/src/main/dist/init_instance/init_rest_server.sh
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/core/pom.xml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
----------------------------------------------------------------------
diff --cc stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
index cf927ce,161037b..5da7067
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
@@@ -33,21 -30,17 +30,15 @@@ import org.apache.usergrid.persistence.
  import org.apache.usergrid.persistence.collection.event.EntityVersionCreated;
  import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
  import org.apache.usergrid.persistence.collection.guice.CollectionModule;
- import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
  import org.apache.usergrid.persistence.core.guice.CommonModule;
  import org.apache.usergrid.persistence.core.migration.data.DataMigration;
- import org.apache.usergrid.persistence.core.migration.data.MigrationDataProvider;
- import org.apache.usergrid.persistence.core.migration.data.MigrationPlugin;
  import org.apache.usergrid.persistence.graph.guice.GraphModule;
- import org.apache.usergrid.persistence.graph.serialization.impl.migration.GraphNode;
  import org.apache.usergrid.persistence.index.guice.IndexModule;
 -import org.apache.usergrid.persistence.index.impl.BufferQueue;
 -import org.apache.usergrid.persistence.index.impl.BufferQueueSQSImpl;
  import org.apache.usergrid.persistence.map.guice.MapModule;
  import org.apache.usergrid.persistence.queue.guice.QueueModule;
- 
- import com.google.inject.AbstractModule;
- import com.google.inject.Provider;
- import com.google.inject.TypeLiteral;
- import com.google.inject.multibindings.Multibinder;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ import org.springframework.context.ApplicationContext;
  
  
  /**
@@@ -76,31 -69,11 +67,31 @@@ public class CoreModule  extends Abstra
          bind(EntityManagerFactory.class).toProvider( lazyEntityManagerFactoryProvider );
  
          install( new CommonModule());
 -        install(new CollectionModule());
 -        install(new GraphModule());
 -        install( new IndexModule() );
 -//        install(new MapModule());   TODO, re-enable when index module doesn't depend on queue
 -//        install(new QueueModule());
 +        install( new CollectionModule() {
 +            /**
 +             * configure our migration data provider for all entities in the system
 +             */
 +            @Override
 +           public void configureMigrationProvider() {
 +
 +                bind(new TypeLiteral< MigrationDataProvider<EntityIdScope>>(){}).to(
 +                    AllEntitiesInSystemImpl.class );
 +           }
 +        } );
 +        install( new GraphModule() {
 +
 +            /**
 +             * Override the observable that needs to be used for migration
 +             */
 +            @Override
 +            public void configureMigrationProvider() {
 +                bind( new TypeLiteral<MigrationDataProvider<GraphNode>>() {} ).to(
 +                    AllNodesInGraphImpl.class );
 +            }
 +        } );
 +        install(new IndexModule());
-         install(new MapModule());
-         install(new QueueModule());
++       //        install(new MapModule());   TODO, re-enable when index module doesn't depend on queue
++       //        install(new QueueModule());
  
          bind(ManagerCache.class).to( CpManagerCache.class );
  

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
----------------------------------------------------------------------
diff --cc stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
index 65dacee,01848cb..95ff9dc
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
@@@ -47,18 -53,18 +54,20 @@@ import rx.schedulers.Schedulers
  public class EntityVersionDeletedHandler implements EntityVersionDeleted {
      private static final Logger logger = LoggerFactory.getLogger(EntityVersionDeletedHandler.class );
  
-     @Inject
-     private SerializationFig serializationFig;
+ 
+ 
+ 
+     private final EntityManagerFactory emf;
  
      @Inject
-     private EntityManagerFactory emf;
+     public EntityVersionDeletedHandler( final EntityManagerFactory emf ) {this.emf = emf;}
  
  
 +
      @Override
 -    public void versionDeleted(
 -            final CollectionScope scope, final Id entityId, final List<MvccEntity> entityVersions) {
 +    public void versionDeleted( final CollectionScope scope, final Id entityId,
 +                                final List<MvccLogEntry> entityVersions ) {
 +
  
          // This check is for testing purposes and for a test that to be able to dynamically turn
          // off and on delete previous versions so that it can test clean-up on read.
@@@ -87,19 -89,18 +92,19 @@@
                  scope.getName()
          );
  
-         rx.Observable.from( entityVersions )
-             .buffer(serializationFig.getBufferSize())
-             .map(new Func1<List<MvccLogEntry>, List<MvccLogEntry>>() {
+         Observable.from( entityVersions )
+             .collect( ei.createBatch(), new Action2<EntityIndexBatch, MvccEntity>() {
                  @Override
-                 public List<MvccLogEntry> call(List<MvccLogEntry> entityList) {
-                     for (MvccLogEntry entity : entityList) {
-                         eibatch.deindex(indexScope, entityId, entity.getVersion());
-                     }
-                     eibatch.execute();
-                     return entityList;
+                 public void call( final EntityIndexBatch entityIndexBatch, final MvccEntity mvccEntity ) {
+                     entityIndexBatch.deindex( indexScope, mvccEntity.getId(), mvccEntity.getVersion() );
                  }
-             }).toBlocking().last();
+             } ).doOnNext( new Action1<EntityIndexBatch>() {
+             @Override
+             public void call( final EntityIndexBatch entityIndexBatch ) {
+                 entityIndexBatch.execute();
+             }
+         } ).toBlocking().last();
      }
  
 +
  }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManagerFactory.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/cache/CachedEntityCollectionManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
index d5478d4,a73d7a7..68ddab5
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
@@@ -21,19 -21,17 +21,14 @@@ package org.apache.usergrid.persistence
  import org.safehaus.guicyfig.GuicyFigModule;
  
  import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
- import org.apache.usergrid.persistence.collection.EntityDeletedFactory;
- import org.apache.usergrid.persistence.collection.EntityVersionCleanupFactory;
- import org.apache.usergrid.persistence.collection.EntityVersionCreatedFactory;
- import org.apache.usergrid.persistence.collection.MvccEntity;
+ import org.apache.usergrid.persistence.collection.impl.EntityVersionTaskFactory;
 -import org.apache.usergrid.persistence.collection.MvccEntity;
  import org.apache.usergrid.persistence.collection.cache.EntityCacheFig;
  import org.apache.usergrid.persistence.collection.event.EntityDeleted;
  import org.apache.usergrid.persistence.collection.event.EntityVersionCreated;
  import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
  import org.apache.usergrid.persistence.collection.impl.EntityCollectionManagerFactoryImpl;
--import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
  import org.apache.usergrid.persistence.collection.mvcc.changelog.ChangeLogGenerator;
  import org.apache.usergrid.persistence.collection.mvcc.changelog.ChangeLogGeneratorImpl;
--import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteStart;
  import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
  import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
  import org.apache.usergrid.persistence.collection.serialization.impl.SerializationModule;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
index 10d85f8,23e375d..ac82181
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
@@@ -26,25 -25,25 +26,18 @@@ import java.util.concurrent.ExecutionEx
  import org.apache.usergrid.persistence.collection.CollectionScope;
  import org.apache.usergrid.persistence.collection.EntityCollectionManager;
  import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+ import org.apache.usergrid.persistence.collection.EntityCollectionManagerSync;
 +import org.apache.usergrid.persistence.collection.EntityDeletedFactory;
 +import org.apache.usergrid.persistence.collection.EntityVersionCleanupFactory;
 +import org.apache.usergrid.persistence.collection.EntityVersionCreatedFactory;
  import org.apache.usergrid.persistence.collection.cache.CachedEntityCollectionManager;
  import org.apache.usergrid.persistence.collection.cache.EntityCacheFig;
--import org.apache.usergrid.persistence.collection.guice.CollectionTaskExecutor;
- import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
- import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkCommit;
- import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkStart;
 -import org.apache.usergrid.persistence.collection.guice.Write;
 -import org.apache.usergrid.persistence.collection.guice.WriteUpdate;
+ import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
 -import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
 -import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkCommit;
 -import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkStart;
  import org.apache.usergrid.persistence.collection.mvcc.stage.write.RollbackAction;
  import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteCommit;
  import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteOptimisticVerify;
  import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteStart;
  import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteUniqueVerify;
- import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
- import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
--import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
--import org.apache.usergrid.persistence.core.guice.ProxyImpl;
 -import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
--import org.apache.usergrid.persistence.core.task.TaskExecutor;
  
  import com.google.common.base.Preconditions;
  import com.google.common.cache.CacheBuilder;
@@@ -74,25 -74,22 +67,24 @@@ public class EntityCollectionManagerFac
      private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
      private final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy;
      private final Keyspace keyspace;
-     private final EntityVersionCleanupFactory entityVersionCleanupFactory;
-     private final EntityVersionCreatedFactory entityVersionCreatedFactory;
-     private final EntityDeletedFactory entityDeletedFactory;
+     private final EntityVersionTaskFactory entityVersionTaskFactory;
      private final TaskExecutor taskExecutor;
+     private final EntityCacheFig entityCacheFig;
+     private final MetricsFactory metricsFactory;
  
 +    private EntityCacheFig entityCacheFig;
 +    private SerializationFig serializationFig;
      private LoadingCache<CollectionScope, EntityCollectionManager> ecmCache =
          CacheBuilder.newBuilder().maximumSize( 1000 )
                      .build( new CacheLoader<CollectionScope, EntityCollectionManager>() {
                          public EntityCollectionManager load( CollectionScope scope ) {
  
                                    //create the target EM that will perform logic
 -                            final EntityCollectionManager target = new EntityCollectionManagerImpl( writeStart, writeUpdate, writeVerifyUnique,
 +                            final EntityCollectionManager target = new EntityCollectionManagerImpl(
 +                                writeStart, writeVerifyUnique,
                                  writeOptimisticVerify, writeCommit, rollback, markStart, markCommit,
                                  entitySerializationStrategy, uniqueValueSerializationStrategy,
-                                 mvccLogEntrySerializationStrategy, keyspace, serializationFig,entityVersionCleanupFactory,
-                                 entityVersionCreatedFactory, entityDeletedFactory, taskExecutor, scope );
 -                                mvccLogEntrySerializationStrategy, keyspace, entityVersionTaskFactory,
 -                                taskExecutor, scope, metricsFactory );
++                                mvccLogEntrySerializationStrategy, keyspace, serializationFig,entityVersionTaskFactory, taskExecutor, scope, metricsFactory );
  
  
                              final EntityCollectionManager proxy = new CachedEntityCollectionManager(entityCacheFig, target  );
@@@ -113,14 -111,13 +105,12 @@@
                                                 final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
                                                 final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy,
                                                 final Keyspace keyspace,
-                                                final EntityVersionCleanupFactory entityVersionCleanupFactory,
-                                                final EntityVersionCreatedFactory entityVersionCreatedFactory,
-                                                final EntityDeletedFactory entityDeletedFactory,
+                                                final EntityVersionTaskFactory entityVersionTaskFactory,
                                                 @CollectionTaskExecutor final TaskExecutor taskExecutor,
                                                final EntityCacheFig entityCacheFig,
-                                                final SerializationFig serializationFig) {
+                                                MetricsFactory metricsFactory) {
  
          this.writeStart = writeStart;
 -        this.writeUpdate = writeUpdate;
          this.writeVerifyUnique = writeVerifyUnique;
          this.writeOptimisticVerify = writeOptimisticVerify;
          this.writeCommit = writeCommit;
@@@ -131,20 -128,21 +121,18 @@@
          this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy;
          this.mvccLogEntrySerializationStrategy = mvccLogEntrySerializationStrategy;
          this.keyspace = keyspace;
-         this.entityVersionCleanupFactory = entityVersionCleanupFactory;
-         this.entityVersionCreatedFactory = entityVersionCreatedFactory;
-         this.entityDeletedFactory = entityDeletedFactory;
+         this.entityVersionTaskFactory = entityVersionTaskFactory;
          this.taskExecutor = taskExecutor;
          this.entityCacheFig = entityCacheFig;
-         this.serializationFig = serializationFig;
+         this.metricsFactory = metricsFactory;
      }
 -
 -
      @Override
 -    public EntityCollectionManager createCollectionManager( CollectionScope collectionScope ) {
 -        Preconditions.checkNotNull( collectionScope );
 -        try {
 -            return ecmCache.get( collectionScope );
 -        }
 -        catch ( ExecutionException ee ) {
 -            throw new RuntimeException( ee );
 +    public EntityCollectionManager createCollectionManager(CollectionScope collectionScope) {
 +        Preconditions.checkNotNull(collectionScope);
 +        try{
 +            return ecmCache.get(collectionScope);
 +        }catch (ExecutionException ee){
 +            throw new RuntimeException(ee);
          }
      }
  

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index 19e8937,6ba4513..001e0db
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@@ -35,6 -34,9 +37,10 @@@ import org.apache.usergrid.persistence.
  import org.apache.usergrid.persistence.collection.MvccEntity;
  import org.apache.usergrid.persistence.collection.VersionSet;
  import org.apache.usergrid.persistence.collection.guice.CollectionTaskExecutor;
++import org.apache.usergrid.persistence.collection.guice.CollectionTaskExecutor;
+ import org.apache.usergrid.persistence.collection.guice.Write;
+ import org.apache.usergrid.persistence.collection.guice.WriteUpdate;
+ import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
  import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
  import org.apache.usergrid.persistence.collection.mvcc.entity.MvccValidationUtils;
  import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
@@@ -51,8 -51,9 +57,11 @@@ import org.apache.usergrid.persistence.
  import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
  import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet;
  import org.apache.usergrid.persistence.core.guice.ProxyImpl;
 +import org.apache.usergrid.persistence.core.task.Task;
 +import org.apache.usergrid.persistence.core.task.TaskExecutor;
+ import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+ import org.apache.usergrid.persistence.core.task.Task;
+ import org.apache.usergrid.persistence.core.task.TaskExecutor;
  import org.apache.usergrid.persistence.core.util.Health;
  import org.apache.usergrid.persistence.core.util.ValidationUtils;
  import org.apache.usergrid.persistence.model.entity.Entity;
@@@ -69,9 -72,14 +80,17 @@@ import com.netflix.astyanax.connectionp
  import com.netflix.astyanax.model.ColumnFamily;
  import com.netflix.astyanax.model.CqlResult;
  import com.netflix.astyanax.serializers.StringSerializer;
++import org.apache.usergrid.persistence.collection.EntityDeletedFactory;
++import org.apache.usergrid.persistence.collection.EntityVersionCleanupFactory;
++import org.apache.usergrid.persistence.collection.EntityVersionCreatedFactory;
+ import org.apache.usergrid.persistence.collection.guice.CollectionTaskExecutor;
+ import org.apache.usergrid.persistence.core.task.Task;
+ import org.apache.usergrid.persistence.core.task.TaskExecutor;
  
+ import rx.Notification;
  import rx.Observable;
  import rx.Subscriber;
+ import rx.functions.Action0;
  import rx.functions.Action1;
  import rx.functions.Func1;
  import rx.schedulers.Schedulers;
@@@ -288,6 -368,140 +375,89 @@@ public class EntityCollectionManagerImp
          } );
      }
  
+ 
+     /**
+      * Retrieves all entities that correspond to each field given in the Collection.
+      * @param fields
+      * @return
+      */
+     @Override
+     public Observable<FieldSet> getEntitiesFromFields( final Collection<Field> fields ) {
+         return rx.Observable.just(fields).map( new Func1<Collection<Field>, FieldSet>() {
+             @Override
+             public FieldSet call( Collection<Field> fields ) {
+                 try {
+ 
+                     final UUID startTime = UUIDGenerator.newTimeUUID();
+ 
+                     //Get back set of unique values that correspond to collection of fields
+                     UniqueValueSet set = uniqueValueSerializationStrategy.load( collectionScope, fields );
+ 
+                     //Short circut if we don't have any uniqueValues from the given fields.
+                     if(!set.iterator().hasNext()){
+                         return new MutableFieldSet( 0 );
+                     }
+ 
+ 
+                     //loop through each field, and construct an entity load
+                     List<Id> entityIds = new ArrayList<>(fields.size());
+                     List<UniqueValue> uniqueValues = new ArrayList<>(fields.size());
+ 
+                     for(final Field expectedField: fields) {
+ 
+                         UniqueValue value = set.getValue(expectedField.getName());
+ 
+                         if(value ==null){
+                             logger.debug( "Field does not correspond to a unique value" );
+                         }
+ 
+                         entityIds.add(value.getEntityId());
+                         uniqueValues.add(value);
+                     }
+ 
+                     //Load a entity for each entityId we retrieved.
+                     final EntitySet entitySet = entitySerializationStrategy.load(collectionScope, entityIds, startTime);
+ 
+                     //now loop through and ensure the entities are there.
+                     final MutationBatch deleteBatch = keyspace.prepareMutationBatch();
+ 
+                     final MutableFieldSet response = new MutableFieldSet(fields.size());
+ 
+                     for(final UniqueValue expectedUnique: uniqueValues) {
+                         final MvccEntity entity = entitySet.getEntity(expectedUnique.getEntityId());
+ 
+                         //bad unique value, delete this, it's inconsistent
+                         if(entity == null || !entity.getEntity().isPresent()){
+                             final MutationBatch valueDelete = uniqueValueSerializationStrategy.delete(collectionScope, expectedUnique);
+                             deleteBatch.mergeShallow(valueDelete);
+                             continue;
+                         }
+ 
+ 
+                         //else add it to our result set
+                         response.addEntity(expectedUnique.getField(),entity);
+ 
+                     }
+ 
+                     //TODO: explore making this an Async process
+                     //We'll repair it again if we have to
+                     deleteBatch.execute();
+ 
+                     return response;
+ 
+ 
+                 }
+                 catch ( ConnectionException e ) {
+                     logger.error( "Failed to getIdField", e );
+                     throw new RuntimeException( e );
+                 }
+             }
+         } );
+     }
+ 
+ 
+ 
 -    @Override
 -    public Observable<Entity> update( final Entity entity ) {
 -
 -        logger.debug( "Starting update process" );
 -
 -        //do our input validation
 -        Preconditions.checkNotNull( entity, "Entity is required in the new stage of the mvcc write" );
 -
 -        final Id entityId = entity.getId();
 -
 -
 -        ValidationUtils.verifyIdentity( entityId );
 -
 -        // create our observable and start the write
 -        CollectionIoEvent<Entity> writeData = new CollectionIoEvent<Entity>( collectionScope, entity );
 -
 -
 -        Observable<CollectionIoEvent<MvccEntity>> observable = stageRunner( writeData, writeUpdate );
 -
 -
 -        final Timer.Context timer = updateTimer.time();
 -        return observable.map( writeCommit ).doOnNext(new Action1<Entity>() {
 -            @Override
 -            public void call(final Entity entity) {
 -                logger.debug("sending entity to the queue");
 -
 -                //we an update, signal the fix
 -                taskExecutor.submit( entityVersionTaskFactory.getCreatedTask( collectionScope, entity ));
 -
 -                taskExecutor.submit( entityVersionTaskFactory.getCleanupTask( collectionScope, entity.getId(), entity.getVersion(), false) );
 -                //TODO T.N Change this to fire a task
 -                //                Observable.from( new CollectionIoEvent<Id>(collectionScope,
 -                // entityId ) ).map( load ).subscribeOn( Schedulers.io() ).subscribe();
 -
 -
 -            }
 -        }).doOnError(rollback)
 -            .doOnNext(new Action1<Entity>() {
 -                @Override
 -                public void call(Entity entity) {
 -                    updateMeter.mark();
 -                }
 -            })
 -            .doOnCompleted(new Action0() {
 -                @Override
 -                public void call() {
 -                    timer.stop();
 -                }
 -            });
 -    }
 -
+ 
      // fire the stages
      public Observable<CollectionIoEvent<MvccEntity>> stageRunner( CollectionIoEvent<Entity> writeData,
                                                                    WriteStart writeState ) {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
index dbe58d4,83f165d..5472645
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
@@@ -22,9 -22,8 +22,8 @@@ import com.google.inject.Inject
  import com.google.inject.assistedinject.Assisted;
  import com.netflix.astyanax.MutationBatch;
  import org.apache.usergrid.persistence.collection.CollectionScope;
- import org.apache.usergrid.persistence.collection.EntityVersionCleanupFactory;
  import org.apache.usergrid.persistence.collection.event.EntityDeleted;
 -import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
 +import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
  import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
  import org.apache.usergrid.persistence.core.task.Task;
  import org.apache.usergrid.persistence.model.entity.Id;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
index 7c39ef4,65506ec..27c3db8
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
@@@ -20,9 -20,17 +20,18 @@@ package org.apache.usergrid.persistence
  
  import java.util.Iterator;
  import java.util.List;
 +import java.util.Set;
  import java.util.UUID;
  
+ import com.google.inject.Inject;
+ import com.google.inject.assistedinject.Assisted;
+ import org.apache.usergrid.persistence.collection.MvccEntity;
+ import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
+ import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
+ import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueImpl;
+ import org.apache.usergrid.persistence.collection.util.EntityUtils;
+ import org.apache.usergrid.persistence.model.entity.Entity;
+ import org.apache.usergrid.persistence.model.field.Field;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
@@@ -70,6 -75,7 +79,7 @@@ public class EntityVersionCleanupTask i
      private final CollectionScope scope;
      private final Id entityId;
      private final UUID version;
 -    private final int numToSkip;
++    private final boolean includeVersion;
  
  
      @Inject
@@@ -89,6 -100,8 +103,8 @@@
          this.scope = scope;
          this.entityId = entityId;
          this.version = version;
+ 
 -        numToSkip = includeVersion? 0: 1;
++        includeVersion = includeVersion;
      }
  
  
@@@ -117,94 -130,79 +133,92 @@@
      @Override
      public Void call() throws Exception {
          //TODO Refactor this logic into a a class that can be invoked from anywhere
 -        //load every entity we have history of
 -        Observable<List<MvccEntity>> deleteFieldsObservable =
 -            Observable.create(new ObservableIterator<MvccEntity>("deleteColumns") {
 -                @Override
 -                protected Iterator<MvccEntity> getIterator() {
 -                    Iterator<MvccEntity> entities =  entitySerializationStrategy.loadDescendingHistory(
 -                        scope, entityId, version, 1000); // TODO: what fetchsize should we use here?
 -                    return entities;
 -                }
 -            })
 -            //buffer them for efficiency
 -            .skip(numToSkip)
 -            .buffer(serializationFig.getBufferSize()).doOnNext(
 -            new Action1<List<MvccEntity>>() {
 -                @Override
 -                public void call(final List<MvccEntity> mvccEntities) {
 -                    final MutationBatch batch = keyspace.prepareMutationBatch();
 -                    final MutationBatch entityBatch = keyspace.prepareMutationBatch();
 -                    final MutationBatch logBatch = keyspace.prepareMutationBatch();
--
 -                    for (MvccEntity mvccEntity : mvccEntities) {
 -                        final UUID entityVersion = mvccEntity.getVersion();
--
-         //iterate all unique values
 -
 -                        //if the entity is present process the fields
 -                        if(mvccEntity.getEntity().isPresent()) {
 -                            final Entity entity = mvccEntity.getEntity().get();
 -
 -                            //remove all unique fields from the index
 -                            for ( final Field field : EntityUtils.getUniqueFields(entity )) {
 -
 -                                final UniqueValue unique = new UniqueValueImpl( field, entityId, entityVersion );
 -                                final MutationBatch deleteMutation =
 -                                    uniqueValueSerializationStrategy.delete( scope, unique );
 -                                batch.mergeShallow( deleteMutation );
++       //iterate all unique values
 +        final BlockingObservable<Long> uniqueValueCleanup =
 +                Observable.create( new ObservableIterator<UniqueValue>( "Unique value load" ) {
 +                    @Override
 +                    protected Iterator<UniqueValue> getIterator() {
 +                        return uniqueValueSerializationStrategy.getAllUniqueFields( scope, entityId );
 +                    }
 +                } )
 +
 +                        //skip current versions
 +                        .skipWhile( new Func1<UniqueValue, Boolean>() {
 +                            @Override
 +                            public Boolean call( final UniqueValue uniqueValue ) {
 +                                return version.equals( uniqueValue.getEntityVersion() );
                              }
 +                        } )
 +                                //buffer our buffer size, then roll them all up in a single batch mutation
 +                        .buffer( serializationFig.getBufferSize() ).doOnNext( new Action1<List<UniqueValue>>() {
 +                    @Override
 +                    public void call( final List<UniqueValue> uniqueValues ) {
 +                        final MutationBatch uniqueCleanupBatch = keyspace.prepareMutationBatch();
 +
 +
 +                        for ( UniqueValue value : uniqueValues ) {
 +                            uniqueCleanupBatch.mergeShallow( uniqueValueSerializationStrategy.delete( scope, value ) );
                          }
  
 -                        final MutationBatch entityDelete = entitySerializationStrategy
 -                                .delete(scope, entityId, mvccEntity.getVersion());
 -                        entityBatch.mergeShallow( entityDelete );
 -                        final MutationBatch logDelete = logEntrySerializationStrategy
 -                                .delete(scope, entityId, version);
 -                        logBatch.mergeShallow(logDelete);
 +                        try {
 +                            uniqueCleanupBatch.execute();
 +                        }
 +                        catch ( ConnectionException e ) {
 +                            throw new RuntimeException( "Unable to execute batch mutation", e );
 +                        }
                      }
 +                } ).subscribeOn( Schedulers.io() ).longCount().toBlocking();
  
 -                    try {
 -                        batch.execute();
 -                    } catch (ConnectionException e1) {
 -                        throw new RuntimeException("Unable to execute " +
 -                                "unique value " +
 -                                "delete", e1);
 -                    }
 -                    fireEvents(mvccEntities);
 -                    try {
 -                        entityBatch.execute();
 -                    } catch (ConnectionException e) {
 -                        throw new RuntimeException("Unable to delete entities in cleanup", e);
 +
 +        //start calling the listeners for remove log entries
 +        BlockingObservable<Long> versionsDeletedObservable =
 +
 +                Observable.create( new ObservableIterator<MvccLogEntry>( "Log entry iterator" ) {
 +                    @Override
 +                    protected Iterator<MvccLogEntry> getIterator() {
 +
 +                        return new LogEntryIterator( logEntrySerializationStrategy, scope, entityId, version,
 +                                serializationFig.getBufferSize() );
                      }
 +                } )
 +                        //skip current version
 +                        .skipWhile( new Func1<MvccLogEntry, Boolean>() {
 +                            @Override
 +                            public Boolean call( final MvccLogEntry mvccLogEntry ) {
 +                                return version.equals( mvccLogEntry.getVersion() );
 +                            }
 +                        } )
 +                                //buffer them for efficiency
 +                        .buffer( serializationFig.getBufferSize() ).doOnNext( new Action1<List<MvccLogEntry>>() {
 +                    @Override
 +                    public void call( final List<MvccLogEntry> mvccEntities ) {
 +
 +                        fireEvents( mvccEntities );
 +
 +                        final MutationBatch logCleanupBatch = keyspace.prepareMutationBatch();
 +
 +
 +                        for ( MvccLogEntry entry : mvccEntities ) {
 +                            logCleanupBatch.mergeShallow( logEntrySerializationStrategy.delete( scope, entityId, entry.getVersion() ));
 +                        }
  
 -                    try {
 -                        logBatch.execute();
 -                    } catch (ConnectionException e) {
 -                        throw new RuntimeException("Unable to delete entities from the log", e);
 +                        try {
 +                            logCleanupBatch.execute();
 +                        }
 +                        catch ( ConnectionException e ) {
 +                            throw new RuntimeException( "Unable to execute batch mutation", e );
 +                        }
                      }
 +                } ).subscribeOn( Schedulers.io() ).longCount().toBlocking();
 +
 +        //wait or this to complete
 +        final Long removedCount = uniqueValueCleanup.last();
  
 -                }
 -            }
 -        );
 +        logger.debug( "Removed unique values for {} entities of entity {}", removedCount, entityId );
  
 -        final int removedCount = deleteFieldsObservable.count().toBlocking().last();
 +        final Long versionCleanupCount = versionsDeletedObservable.last();
  
 -        logger.debug("Removed unique values for {} entities of entity {}",removedCount,entityId);
 +        logger.debug( "Removed {} previous entity versions of entity {}", versionCleanupCount, entityId );
  
          return null;
      }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java
index 5b7898e,60275d9..d7a6407
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java
@@@ -23,9 -22,9 +23,10 @@@ import java.util.Iterator
  
  import com.netflix.astyanax.MutationBatch;
  import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+ import com.netflix.astyanax.model.ConsistencyLevel;
  import org.apache.usergrid.persistence.collection.CollectionScope;
  import org.apache.usergrid.persistence.core.migration.schema.Migration;
 +import org.apache.usergrid.persistence.model.entity.Id;
  import org.apache.usergrid.persistence.model.field.Field;
  
  
@@@ -55,27 -53,23 +56,38 @@@ public interface UniqueValueSerializati
      /**
       * Load UniqueValue that matches field from collection or null if that value does not exist.
       *
 -     * @param colScope Collection scope in which to look for field name/value
 +     * @param collectionScope scope in which to look for field name/value
       * @param fields Field name/value to search for
 +     *
       * @return UniqueValueSet containing fields from the collection that exist in cassandra
 +     *
       * @throws ConnectionException on error connecting to Cassandra
       */
 -    public UniqueValueSet load( CollectionScope colScope, Collection<Field> fields ) throws ConnectionException;
 +    public UniqueValueSet load( CollectionScope collectionScope, Collection<Field> fields ) throws ConnectionException;
  
+     /**
 -     * Load UniqueValue that matches field from collection or null if that value does not exist.
 -     *
 -     * @param colScope Collection scope in which to look for field name/value
 -     * @param consistencyLevel Consistency level of query
 -     * @param fields Field name/value to search for
 -     * @return UniqueValueSet containing fields from the collection that exist in cassandra
 -     * @throws ConnectionException on error connecting to Cassandra
 -     */
++    * Load UniqueValue that matches field from collection or null if that value does not exist.
++    *
++    * @param colScope Collection scope in which to look for field name/value
++    * @param consistencyLevel Consistency level of query
++    * @param fields Field name/value to search for
++    * @return UniqueValueSet containing fields from the collection that exist in cassandra
++    * @throws ConnectionException on error connecting to Cassandra
++    */
+     public UniqueValueSet load( CollectionScope colScope, ConsistencyLevel consistencyLevel, Collection<Field> fields ) throws ConnectionException;
++
 +
 +    /**
 +     * Loads the currently persisted history of every unique value the entity has held.  This will
 +     * start from the max version and return values in descending version order.  Note that for entities
 +     * with more than one unique field, sequential fields can be returned with the same version.
 +     * @param collectionScope The scope the entity is stored in
 +     * @param entityId
 +     * @return
 +     */
 +    public Iterator<UniqueValue> getAllUniqueFields(CollectionScope collectionScope, Id entityId);
 +
 +
      /**
       * Delete the specified Unique Value from Cassandra.
       *

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
index 7aed8db,47372ae..108f2e8
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
@@@ -38,16 -36,8 +40,17 @@@ import org.apache.usergrid.persistence.
  import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
  import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
  import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet;
 +import org.apache.usergrid.persistence.collection.util.EntityUtils;
 +import org.apache.usergrid.persistence.core.astyanax.ColumnNameIterator;
 +import org.apache.usergrid.persistence.core.astyanax.ColumnParser;
 +import org.apache.usergrid.persistence.core.astyanax.ColumnTypes;
 +import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer;
 +import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
 +import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
 +import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
+ import org.apache.usergrid.persistence.core.migration.schema.Migration;
  import org.apache.usergrid.persistence.core.util.ValidationUtils;
 +import org.apache.usergrid.persistence.model.entity.Entity;
  import org.apache.usergrid.persistence.model.entity.Id;
  import org.apache.usergrid.persistence.model.field.Field;
  
@@@ -78,40 -65,23 +81,43 @@@ public class UniqueValueSerializationSt
  
      private static final EntityVersionSerializer ENTITY_VERSION_SER = new EntityVersionSerializer();
  
 -    private static final MultiTennantColumnFamily<ScopedRowKey<CollectionPrefixedKey<Field>>, EntityVersion> CF_UNIQUE_VALUES =
 -            new MultiTennantColumnFamily<>( "Unique_Values", ROW_KEY_SER,
 -                    ENTITY_VERSION_SER );
 +    private static final MultiTennantColumnFamily<ScopedRowKey<CollectionPrefixedKey<Field>>, EntityVersion>
 +        CF_UNIQUE_VALUES = new MultiTennantColumnFamily<>( "Unique_Values", ROW_KEY_SER, ENTITY_VERSION_SER );
 +
 +
++
 +    private static final IdRowCompositeSerializer ID_SER = IdRowCompositeSerializer.get();
 +
 +
 +    private static final CollectionScopedRowKeySerializer<Id> ENTITY_ROW_KEY_SER =
 +        new CollectionScopedRowKeySerializer<>( ID_SER );
 +
  
 +
 +    private static final MultiTennantColumnFamily<ScopedRowKey<CollectionPrefixedKey<Id>>, UniqueFieldEntry>
 +        CF_ENTITY_UNIQUE_VALUES =
 +        new MultiTennantColumnFamily<>( "Entity_Unique_Values", ENTITY_ROW_KEY_SER, UniqueFieldEntrySerializer.get() );
 +
 +    public static final int COL_VALUE = 0x0;
 +
 +
-     private final Keyspace keyspace;
 +    private final SerializationFig serializationFig;
+     protected final Keyspace keyspace;
 -    private final CassandraFig cassandraFig;
++       private final CassandraFig cassandraFig;
 +
  
  
      /**
       * Construct serialization strategy for keyspace.
       *
       * @param keyspace Keyspace in which to store Unique Values.
 +     * @param serializationFig
       */
      @Inject
-     public UniqueValueSerializationStrategyImpl( final Keyspace keyspace, final SerializationFig serializationFig ) {
 -    public UniqueValueSerializationStrategyImpl( final Keyspace keyspace, final CassandraFig cassandraFig) {
 -        this.cassandraFig = cassandraFig;
++    public UniqueValueSerializationStrategyImpl( final Keyspace keyspace, final CassandraFig cassandraFig, final  SerializationFig serializationFig ) {
          this.keyspace = keyspace;
++        this.cassandraFig = cassandraFig;
 +        this.serializationFig = serializationFig;
      }
  
  

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/util/EntityUtils.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/util/EntityUtils.java
index beff3bd,cf964a3..20edb66
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/util/EntityUtils.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/util/EntityUtils.java
@@@ -2,9 -2,10 +2,10 @@@ package org.apache.usergrid.persistence
  
  
  import java.util.ArrayList;
- import java.util.Collection;
  import java.util.List;
 -import java.util.Set;
++import java.util.Collection;
  import java.util.UUID;
+ import org.apache.usergrid.persistence.model.field.Field;
  
  import org.apache.commons.lang3.reflect.FieldUtils;
  

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
index 8f78799,f19d613..246bbca
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
@@@ -96,7 -126,8 +126,8 @@@ public class GraphManagerImpl implement
                               final GraphFig graphFig,
                               final EdgeDeleteListener edgeDeleteListener,
                               final NodeDeleteListener nodeDeleteListener,
-                              final ApplicationScope scope) {
 -                             @Assisted final ApplicationScope scope,
++                             final ApplicationScope scope,
+                              MetricsFactory metricsFactory) {
  
  
          ValidationUtils.validateApplicationScope( scope );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
index 7fccd11,50b994d..27fe705
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
@@@ -19,8 -19,12 +19,9 @@@
  package org.apache.usergrid.persistence.index.guice;
  
  
- import org.apache.usergrid.persistence.core.guice.CommonModule;
+ import org.apache.usergrid.persistence.collection.guice.CollectionModule;
  import org.apache.usergrid.persistence.core.guice.TestModule;
+ import org.apache.usergrid.persistence.core.guice.CommonModule;
 -import org.apache.usergrid.persistence.index.impl.BufferQueue;
 -import org.apache.usergrid.persistence.index.impl.BufferQueueInMemoryImpl;
 -import org.apache.usergrid.persistence.index.impl.BufferQueueSQSImpl;
  
  
  public class TestIndexModule extends TestModule {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
index e4043d2,ca9bf79..9cb5297
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
@@@ -24,10 -24,14 +24,12 @@@ import java.util.*
  import java.util.concurrent.CountDownLatch;
  import java.util.concurrent.atomic.AtomicLong;
  
+ import org.apache.usergrid.persistence.core.guice.MigrationManagerRule;
  import org.apache.usergrid.persistence.index.*;
 -import org.apache.usergrid.persistence.model.field.ArrayField;
 -import org.apache.usergrid.persistence.model.field.EntityObjectField;
 -import org.apache.usergrid.persistence.model.field.UUIDField;
 +import org.apache.usergrid.persistence.model.field.*;
  import org.apache.usergrid.persistence.model.field.value.EntityObject;
  import org.junit.Ignore;
+ import org.junit.Rule;
  import org.junit.Test;
  import org.junit.runner.RunWith;
  import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/pom.xml
----------------------------------------------------------------------


[2/2] incubator-usergrid git commit: Second pass at fixing the merge

Posted by to...@apache.org.
Second pass at fixing the merge


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/762e43e1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/762e43e1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/762e43e1

Branch: refs/heads/USERGRID-405-merge
Commit: 762e43e1ee9dfda1a07b9ba6ea7cd13cede06afc
Parents: 7b5d222
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Mar 17 17:26:25 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Mar 17 17:26:25 2015 -0600

----------------------------------------------------------------------
 .../usergrid/corepersistence/CoreModule.java    | 37 +++++------
 .../corepersistence/CpEntityManagerFactory.java | 21 +++----
 .../events/EntityVersionDeletedHandler.java     |  8 +--
 .../collection/EntityCollectionManager.java     |  6 --
 .../EntityCollectionManagerFactoryImpl.java     | 22 ++++---
 .../impl/EntityCollectionManagerImpl.java       | 65 ++++++++------------
 .../impl/EntityVersionCleanupTask.java          |  9 +--
 .../MvccEntitySerializationStrategyV1Impl.java  |  2 +-
 .../MvccEntitySerializationStrategyV3Impl.java  |  1 -
 .../migration/MvccEntityDataMigrationImpl.java  |  8 +--
 .../collection/EntityCollectionManagerIT.java   |  2 +-
 .../impl/EntityVersionCleanupTaskTest.java      | 13 ++--
 ...ntitySerializationStrategyProxyV1_3Test.java |  3 +
 ...ntitySerializationStrategyProxyV2_3Test.java |  3 +
 ...ccEntitySerializationStrategyV1ImplTest.java |  2 +
 .../impl/GraphManagerFactoryImpl.java           | 17 ++---
 16 files changed, 104 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/762e43e1/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
index 5da7067..2e9b780 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
@@ -15,30 +15,37 @@
  */
 package org.apache.usergrid.corepersistence;
 
-import com.google.inject.AbstractModule;
-import com.google.inject.Provider;
-import com.google.inject.multibindings.Multibinder;
 
-import org.apache.usergrid.corepersistence.migration.EntityDataMigration;
-import org.apache.usergrid.corepersistence.migration.EntityTypeMappingMigration;
-import org.apache.usergrid.corepersistence.migration.GraphShardVersionMigration;
+import org.springframework.context.ApplicationContext;
+
 import org.apache.usergrid.corepersistence.events.EntityDeletedHandler;
 import org.apache.usergrid.corepersistence.events.EntityVersionCreatedHandler;
 import org.apache.usergrid.corepersistence.events.EntityVersionDeletedHandler;
+import org.apache.usergrid.corepersistence.migration.CoreMigration;
+import org.apache.usergrid.corepersistence.migration.CoreMigrationPlugin;
+import org.apache.usergrid.corepersistence.migration.EntityTypeMappingMigration;
+import org.apache.usergrid.corepersistence.migration.MigrationModuleVersionPlugin;
+import org.apache.usergrid.corepersistence.rx.impl.AllApplicationsObservableImpl;
+import org.apache.usergrid.corepersistence.rx.impl.AllEntitiesInSystemImpl;
+import org.apache.usergrid.corepersistence.rx.impl.AllNodesInGraphImpl;
 import org.apache.usergrid.persistence.EntityManagerFactory;
 import org.apache.usergrid.persistence.collection.event.EntityDeleted;
 import org.apache.usergrid.persistence.collection.event.EntityVersionCreated;
 import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
 import org.apache.usergrid.persistence.collection.guice.CollectionModule;
+import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
 import org.apache.usergrid.persistence.core.guice.CommonModule;
 import org.apache.usergrid.persistence.core.migration.data.DataMigration;
+import org.apache.usergrid.persistence.core.migration.data.MigrationDataProvider;
+import org.apache.usergrid.persistence.core.migration.data.MigrationPlugin;
 import org.apache.usergrid.persistence.graph.guice.GraphModule;
+import org.apache.usergrid.persistence.graph.serialization.impl.migration.GraphNode;
 import org.apache.usergrid.persistence.index.guice.IndexModule;
-import org.apache.usergrid.persistence.map.guice.MapModule;
-import org.apache.usergrid.persistence.queue.guice.QueueModule;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.context.ApplicationContext;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Provider;
+import com.google.inject.TypeLiteral;
+import com.google.inject.multibindings.Multibinder;
 
 
 /**
@@ -74,7 +81,7 @@ public class CoreModule  extends AbstractModule {
             @Override
            public void configureMigrationProvider() {
 
-                bind(new TypeLiteral< MigrationDataProvider<EntityIdScope>>(){}).to(
+                bind(new TypeLiteral<MigrationDataProvider<EntityIdScope>>(){}).to(
                     AllEntitiesInSystemImpl.class );
            }
         } );
@@ -95,12 +102,6 @@ public class CoreModule  extends AbstractModule {
 
         bind(ManagerCache.class).to( CpManagerCache.class );
 
-        Multibinder<DataMigration> dataMigrationMultibinder =
-                Multibinder.newSetBinder( binder(), DataMigration.class );
-        dataMigrationMultibinder.addBinding().to( EntityTypeMappingMigration.class );
-        dataMigrationMultibinder.addBinding().to( GraphShardVersionMigration.class );
-        dataMigrationMultibinder.addBinding().to( EntityDataMigration.class );
-
         Multibinder<EntityDeleted> entityBinder =
             Multibinder.newSetBinder(binder(), EntityDeleted.class);
         entityBinder.addBinding().to(EntityDeletedHandler.class);

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/762e43e1/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index 01dfa1f..f76b9fc 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -15,15 +15,14 @@
  */
 package org.apache.usergrid.corepersistence;
 
-import com.google.common.base.Optional;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-import static java.lang.String.CASE_INSENSITIVE_ORDER;
 
-import java.util.*;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.slf4j.Logger;
@@ -48,9 +47,8 @@ import org.apache.usergrid.persistence.cassandra.Setup;
 import org.apache.usergrid.persistence.collection.CollectionScope;
 import org.apache.usergrid.persistence.collection.EntityCollectionManager;
 import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl;
-import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
-import org.apache.usergrid.persistence.core.migration.data.DataMigrationManager;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.core.migration.data.MigrationDataProvider;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
@@ -59,7 +57,6 @@ import org.apache.usergrid.persistence.entities.Application;
 import org.apache.usergrid.persistence.exceptions.ApplicationAlreadyExistsException;
 import org.apache.usergrid.persistence.exceptions.DuplicateUniquePropertyExistsException;
 import org.apache.usergrid.persistence.exceptions.EntityNotFoundException;
-import org.apache.usergrid.persistence.exceptions.OrganizationAlreadyExistsException;
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.graph.SearchByEdgeType;
@@ -71,13 +68,13 @@ import org.apache.usergrid.persistence.model.entity.SimpleId;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 import org.apache.usergrid.utils.UUIDUtils;
 
+import com.google.common.base.Optional;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import com.google.inject.Injector;
 import com.google.inject.Key;
 import com.google.inject.TypeLiteral;
-import com.yammer.metrics.annotation.Metered;
 
 import rx.Observable;
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/762e43e1/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
index 95ff9dc..c45949b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
@@ -85,7 +85,7 @@ public class EntityVersionDeletedHandler implements EntityVersionDeleted {
 
         CpEntityManagerFactory cpemf = (CpEntityManagerFactory)emf;
 
-        final EntityIndex ei = cpemf.getManagerCache().getEntityIndex(scope);
+        final EntityIndex ei = cpemf.getManagerCache().getEntityIndex( scope );
 
         final IndexScope indexScope = new IndexScopeImpl(
                 new SimpleId(scope.getOwner().getUuid(), scope.getOwner().getType()),
@@ -93,10 +93,10 @@ public class EntityVersionDeletedHandler implements EntityVersionDeleted {
         );
 
         Observable.from( entityVersions )
-            .collect( ei.createBatch(), new Action2<EntityIndexBatch, MvccEntity>() {
+            .collect( ei.createBatch(), new Action2<EntityIndexBatch, MvccLogEntry>() {
                 @Override
-                public void call( final EntityIndexBatch entityIndexBatch, final MvccEntity mvccEntity ) {
-                    entityIndexBatch.deindex( indexScope, mvccEntity.getId(), mvccEntity.getVersion() );
+                public void call( final EntityIndexBatch entityIndexBatch, final MvccLogEntry mvccLogEntry ) {
+                    entityIndexBatch.deindex( indexScope, mvccLogEntry.getEntityId(), mvccLogEntry.getVersion() );
                 }
             } ).doOnNext( new Action1<EntityIndexBatch>() {
             @Override

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/762e43e1/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
index 35fc5d4..6532ee6 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
@@ -75,12 +75,6 @@ public interface EntityCollectionManager {
      */
     public Observable<EntitySet> load(Collection<Id> entityIds);
 
-    /**
-     * Takes the change and reloads an entity with all changes applied in this entity applied.
-     * The resulting entity from calling load will be the previous version of this entity plus
-     * the entity in this object applied to it.
-     */
-    public Observable<Entity> update ( Entity entity );
 
     /**
      * Returns health of entity data store.

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/762e43e1/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
index ac82181..409467c 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
@@ -26,18 +26,23 @@ import java.util.concurrent.ExecutionException;
 import org.apache.usergrid.persistence.collection.CollectionScope;
 import org.apache.usergrid.persistence.collection.EntityCollectionManager;
 import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
-import org.apache.usergrid.persistence.collection.EntityCollectionManagerSync;
-import org.apache.usergrid.persistence.collection.EntityDeletedFactory;
-import org.apache.usergrid.persistence.collection.EntityVersionCleanupFactory;
-import org.apache.usergrid.persistence.collection.EntityVersionCreatedFactory;
 import org.apache.usergrid.persistence.collection.cache.CachedEntityCollectionManager;
 import org.apache.usergrid.persistence.collection.cache.EntityCacheFig;
-import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.guice.CollectionTaskExecutor;
+import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkCommit;
+import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkStart;
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.RollbackAction;
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteCommit;
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteOptimisticVerify;
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteStart;
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteUniqueVerify;
+import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
+import org.apache.usergrid.persistence.core.guice.ProxyImpl;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.core.task.TaskExecutor;
 
 import com.google.common.base.Preconditions;
 import com.google.common.cache.CacheBuilder;
@@ -45,6 +50,7 @@ import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
+import com.google.inject.assistedinject.Assisted;
 import com.netflix.astyanax.Keyspace;
 
 
@@ -72,25 +78,21 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag
     private final EntityCacheFig entityCacheFig;
     private final MetricsFactory metricsFactory;
 
-    private EntityCacheFig entityCacheFig;
-    private SerializationFig serializationFig;
     private LoadingCache<CollectionScope, EntityCollectionManager> ecmCache =
         CacheBuilder.newBuilder().maximumSize( 1000 )
                     .build( new CacheLoader<CollectionScope, EntityCollectionManager>() {
                         public EntityCollectionManager load( CollectionScope scope ) {
-
                                   //create the target EM that will perform logic
                             final EntityCollectionManager target = new EntityCollectionManagerImpl(
                                 writeStart, writeVerifyUnique,
                                 writeOptimisticVerify, writeCommit, rollback, markStart, markCommit,
                                 entitySerializationStrategy, uniqueValueSerializationStrategy,
-                                mvccLogEntrySerializationStrategy, keyspace, serializationFig,entityVersionTaskFactory, taskExecutor, scope, metricsFactory );
+                                mvccLogEntrySerializationStrategy, keyspace,entityVersionTaskFactory, taskExecutor, scope, metricsFactory );
 
 
                             final EntityCollectionManager proxy = new CachedEntityCollectionManager(entityCacheFig, target  );
 
                             return proxy;
-//                            return target;
                         }
                     } );
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/762e43e1/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index 001e0db..f565fab 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@ -19,28 +19,22 @@
 package org.apache.usergrid.persistence.collection.impl;
 
 
-import java.util.*;
-
-import com.netflix.astyanax.MutationBatch;
-import org.apache.usergrid.persistence.collection.*;
-import org.apache.usergrid.persistence.collection.serialization.impl.MutableFieldSet;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.persistence.collection.CollectionScope;
 import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.collection.EntityDeletedFactory;
 import org.apache.usergrid.persistence.collection.EntitySet;
-import org.apache.usergrid.persistence.collection.EntityVersionCleanupFactory;
-import org.apache.usergrid.persistence.collection.EntityVersionCreatedFactory;
+import org.apache.usergrid.persistence.collection.FieldSet;
 import org.apache.usergrid.persistence.collection.MvccEntity;
 import org.apache.usergrid.persistence.collection.VersionSet;
 import org.apache.usergrid.persistence.collection.guice.CollectionTaskExecutor;
-import org.apache.usergrid.persistence.collection.guice.CollectionTaskExecutor;
-import org.apache.usergrid.persistence.collection.guice.Write;
-import org.apache.usergrid.persistence.collection.guice.WriteUpdate;
-import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
 import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
 import org.apache.usergrid.persistence.collection.mvcc.entity.MvccValidationUtils;
 import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
@@ -52,13 +46,11 @@ import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteOptimist
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteStart;
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteUniqueVerify;
 import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
-import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
 import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
 import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
 import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet;
+import org.apache.usergrid.persistence.collection.serialization.impl.MutableFieldSet;
 import org.apache.usergrid.persistence.core.guice.ProxyImpl;
-import org.apache.usergrid.persistence.core.task.Task;
-import org.apache.usergrid.persistence.core.task.TaskExecutor;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.core.task.Task;
 import org.apache.usergrid.persistence.core.task.TaskExecutor;
@@ -75,17 +67,12 @@ import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
 import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
 import com.netflix.astyanax.connectionpool.OperationResult;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 import com.netflix.astyanax.model.ColumnFamily;
 import com.netflix.astyanax.model.CqlResult;
 import com.netflix.astyanax.serializers.StringSerializer;
-import org.apache.usergrid.persistence.collection.EntityDeletedFactory;
-import org.apache.usergrid.persistence.collection.EntityVersionCleanupFactory;
-import org.apache.usergrid.persistence.collection.EntityVersionCreatedFactory;
-import org.apache.usergrid.persistence.collection.guice.CollectionTaskExecutor;
-import org.apache.usergrid.persistence.core.task.Task;
-import org.apache.usergrid.persistence.core.task.TaskExecutor;
 
 import rx.Notification;
 import rx.Observable;
@@ -224,19 +211,19 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
                     entity.getVersion(), false ));
                 //post-processing to come later. leave it empty for now.
             }
-        }).doOnError(rollback)
-            .doOnEach(new Action1<Notification<? super Entity>>() {
+        }).doOnError( rollback )
+            .doOnEach( new Action1<Notification<? super Entity>>() {
                 @Override
-                public void call(Notification<? super Entity> notification) {
+                public void call( Notification<? super Entity> notification ) {
                     writeMeter.mark();
                 }
-            })
-            .doOnCompleted(new Action0() {
+            } )
+            .doOnCompleted( new Action0() {
                 @Override
                 public void call() {
                     timer.stop();
                 }
-            });
+            } );
     }
 
 
@@ -250,7 +237,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
         final Timer.Context timer = deleteTimer.time();
         Observable<Id> o = Observable.from(new CollectionIoEvent<Id>(collectionScope, entityId))
             .map(markStart)
-            .doOnNext(markCommit)
+            .doOnNext( markCommit )
             .map(new Func1<CollectionIoEvent<MvccEntity>, Id>() {
 
                      @Override
@@ -269,12 +256,12 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
                     deleteMeter.mark();
                 }
             })
-            .doOnCompleted(new Action0() {
+            .doOnCompleted( new Action0() {
                 @Override
                 public void call() {
                     timer.stop();
                 }
-            });
+            } );
 
         return o;
     }
@@ -300,18 +287,18 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
                 return Observable.from(entity.getEntity().get());
             }
         })
-            .doOnNext(new Action1<Entity>() {
+            .doOnNext( new Action1<Entity>() {
                 @Override
-                public void call(Entity entity) {
+                public void call( Entity entity ) {
                     loadMeter.mark();
                 }
-            })
-            .doOnCompleted(new Action0() {
+            } )
+            .doOnCompleted( new Action0() {
                 @Override
                 public void call() {
                     timer.stop();
                 }
-            });
+            } );
     }
 
 
@@ -347,12 +334,12 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
                     loadMeter.mark();
                 }
             })
-            .doOnCompleted(new Action0() {
+            .doOnCompleted( new Action0() {
                 @Override
                 public void call() {
                     timer.stop();
                 }
-            });
+            } );
     }
 
 
@@ -505,12 +492,12 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
                 }
             }
         } )
-            .doOnCompleted(new Action0() {
+            .doOnCompleted( new Action0() {
                 @Override
                 public void call() {
                     timer.stop();
                 }
-            });
+            } );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/762e43e1/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
index 27c3db8..55d135b 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
@@ -26,10 +26,12 @@ import java.util.UUID;
 import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
 import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
 import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
 import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
 import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueImpl;
 import org.apache.usergrid.persistence.collection.util.EntityUtils;
+import org.apache.usergrid.persistence.core.guice.ProxyImpl;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.field.Field;
 import org.slf4j.Logger;
@@ -86,7 +88,6 @@ public class EntityVersionCleanupTask implements Task<Void> {
     public EntityVersionCleanupTask(
         final SerializationFig serializationFig,
         final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
-        @ProxyImpl final MvccEntitySerializationStrategy   entitySerializationStrategy,
         final UniqueValueSerializationStrategy  uniqueValueSerializationStrategy,
         final Keyspace                          keyspace,
         final Set<EntityVersionDeleted>         listeners, // MUST be a set or Guice will not inject
@@ -104,7 +105,7 @@ public class EntityVersionCleanupTask implements Task<Void> {
         this.entityId = entityId;
         this.version = version;
 
-        includeVersion = includeVersion;
+        this.includeVersion = includeVersion;
     }
 
 
@@ -146,7 +147,7 @@ public class EntityVersionCleanupTask implements Task<Void> {
                         .skipWhile( new Func1<UniqueValue, Boolean>() {
                             @Override
                             public Boolean call( final UniqueValue uniqueValue ) {
-                                return version.equals( uniqueValue.getEntityVersion() );
+                                return !includeVersion && version.equals( uniqueValue.getEntityVersion() );
                             }
                         } )
                                 //buffer our buffer size, then roll them all up in a single batch mutation
@@ -185,7 +186,7 @@ public class EntityVersionCleanupTask implements Task<Void> {
                         .skipWhile( new Func1<MvccLogEntry, Boolean>() {
                             @Override
                             public Boolean call( final MvccLogEntry mvccLogEntry ) {
-                                return version.equals( mvccLogEntry.getVersion() );
+                                return !includeVersion && version.equals( mvccLogEntry.getVersion() );
                             }
                         } )
                                 //buffer them for efficiency

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/762e43e1/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV1Impl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV1Impl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV1Impl.java
index 6f8525c..40054ec 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV1Impl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV1Impl.java
@@ -194,7 +194,7 @@ public class MvccEntitySerializationStrategyV1Impl extends MvccEntitySerializati
             // it's been deleted, remove it
 
             if ( Arrays.equals( STATE_DELETED, state ) ) {
-                return new EntityWrapper( MvccEntity.Status.COMPLETE, Optional.<Entity>absent() );
+                return new EntityWrapper( MvccEntity.Status.DELETED, Optional.<Entity>absent() );
             }
 
             Entity storedEntity;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/762e43e1/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3Impl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3Impl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3Impl.java
index 6ad18bb..ace076b 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3Impl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3Impl.java
@@ -310,7 +310,6 @@ public class MvccEntitySerializationStrategyV3Impl implements MvccEntitySerializ
         Preconditions.checkNotNull( version, "version is required" );
 
 
-        TimeUUIDUtils.getMicrosTimeFromUUID(version);
         return doWrite( collectionScope, entityId, version, new RowOp() {
             @Override
             public void doOp( final ColumnListMutation<Boolean> colMutation ) {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/762e43e1/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
index beaaef9..bd4eafc 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
@@ -30,9 +30,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.persistence.collection.CollectionScope;
-import org.apache.usergrid.persistence.collection.EntityVersionCleanupFactory;
 import org.apache.usergrid.persistence.collection.MvccEntity;
 import org.apache.usergrid.persistence.collection.impl.EntityVersionCleanupTask;
+import org.apache.usergrid.persistence.collection.impl.EntityVersionTaskFactory;
 import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
 import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
 import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
@@ -75,7 +75,7 @@ public class MvccEntityDataMigrationImpl implements DataMigration<EntityIdScope>
     private final Keyspace keyspace;
     private final VersionedMigrationSet<MvccEntitySerializationStrategy> allVersions;
     private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
-    private final EntityVersionCleanupFactory entityVersionCleanupFactory;
+    private final EntityVersionTaskFactory entityVersionCleanupFactory;
     private final MvccEntitySerializationStrategyV3Impl mvccEntitySerializationStrategyV3;
 
 
@@ -84,7 +84,7 @@ public class MvccEntityDataMigrationImpl implements DataMigration<EntityIdScope>
     public MvccEntityDataMigrationImpl( final Keyspace keyspace,
                                         final VersionedMigrationSet<MvccEntitySerializationStrategy> allVersions,
                                         final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
-                                        final EntityVersionCleanupFactory entityVersionCleanupFactory,
+                                        final EntityVersionTaskFactory entityVersionCleanupFactory,
                                         final MvccEntitySerializationStrategyV3Impl mvccEntitySerializationStrategyV3
                                       ) {
 
@@ -227,7 +227,7 @@ public class MvccEntityDataMigrationImpl implements DataMigration<EntityIdScope>
                                             totalBatch.mergeShallow( mb );
                                         }
 
-                                        final EntityVersionCleanupTask task = entityVersionCleanupFactory.getTask( message.scope, message.entity.getId(), version );
+                                        final EntityVersionCleanupTask task = entityVersionCleanupFactory.getCleanupTask( message.scope, message.entity.getId(), version, false );
 
                                         entityVersionCleanupTasks.add( task );
                                     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/762e43e1/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
index c92bb74..f6d9782 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
@@ -23,7 +23,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.UUID;
 
-import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
 import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
 import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet;
 import org.apache.usergrid.persistence.core.guice.ProxyImpl;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/762e43e1/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
index b4921a3..787ab30 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
@@ -128,7 +128,7 @@ public class EntityVersionCleanupTaskTest {
 
         EntityVersionCleanupTask cleanupTask =
                 new EntityVersionCleanupTask( serializationFig, less, uvss, keyspace, listeners, appScope, entityId,
-                        version );
+                        version, false );
 
         final MutationBatch newBatch = mock( MutationBatch.class );
 
@@ -209,8 +209,7 @@ public class EntityVersionCleanupTaskTest {
 
 
         EntityVersionCleanupTask cleanupTask =
-                new EntityVersionCleanupTask( serializationFig, less, uvss, keyspace, listeners, appScope, entityId,
-                        version );
+                new EntityVersionCleanupTask( serializationFig, less, uvss, keyspace, listeners, appScope, entityId, version, false );
 
         final MutationBatch newBatch = mock( MutationBatch.class );
 
@@ -290,7 +289,7 @@ public class EntityVersionCleanupTaskTest {
 
         EntityVersionCleanupTask cleanupTask =
                 new EntityVersionCleanupTask( serializationFig, less, uvss, keyspace, listeners, appScope, entityId,
-                        version );
+                        version, false );
 
         final MutationBatch newBatch = mock( MutationBatch.class );
 
@@ -390,7 +389,7 @@ public class EntityVersionCleanupTaskTest {
 
         EntityVersionCleanupTask cleanupTask =
                 new EntityVersionCleanupTask( serializationFig, less, uvss, keyspace, listeners, appScope, entityId,
-                        version );
+                        version, false );
 
         final MutationBatch newBatch = mock( MutationBatch.class );
 
@@ -517,7 +516,7 @@ public class EntityVersionCleanupTaskTest {
 
         EntityVersionCleanupTask cleanupTask =
                 new EntityVersionCleanupTask( serializationFig, less, uvss, keyspace, listeners, appScope, entityId,
-                        version );
+                        version, false);
 
         final MutationBatch newBatch = mock( MutationBatch.class );
 
@@ -645,7 +644,7 @@ public class EntityVersionCleanupTaskTest {
 
         EntityVersionCleanupTask cleanupTask =
                 new EntityVersionCleanupTask( serializationFig, less, uvss, keyspace, listeners, appScope, entityId,
-                        version );
+                        version, false );
 
         final MutationBatch newBatch = mock( MutationBatch.class );
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/762e43e1/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyV1_3Test.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyV1_3Test.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyV1_3Test.java
index 42b10c4..56abfd8 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyV1_3Test.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyV1_3Test.java
@@ -34,9 +34,12 @@ import org.apache.usergrid.persistence.core.test.UseModules;
 
 import com.google.inject.Inject;
 
+import net.jcip.annotations.NotThreadSafe;
+
 
 @RunWith( ITRunner.class )
 @UseModules( TestCollectionModule.class )
+@NotThreadSafe//anything that changes the system version state is not safe to be run concurrently
 public class MvccEntitySerializationStrategyProxyV1_3Test extends MvccEntitySerializationStrategyImplTest {
 
     @Inject

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/762e43e1/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyV2_3Test.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyV2_3Test.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyV2_3Test.java
index da496f7..b372fbf 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyV2_3Test.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyV2_3Test.java
@@ -34,9 +34,12 @@ import org.apache.usergrid.persistence.core.test.UseModules;
 
 import com.google.inject.Inject;
 
+import net.jcip.annotations.NotThreadSafe;
+
 
 @RunWith( ITRunner.class )
 @UseModules( TestCollectionModule.class )
+@NotThreadSafe//anything that changes the system version state is not safe to be run concurrently
 public class MvccEntitySerializationStrategyProxyV2_3Test extends MvccEntitySerializationStrategyV2Test {
 
     @Inject

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/762e43e1/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV1ImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV1ImplTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV1ImplTest.java
index 4e0c8cc..b0dba5d 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV1ImplTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV1ImplTest.java
@@ -49,6 +49,8 @@ import com.google.common.base.Optional;
 import com.google.inject.Inject;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
+import net.jcip.annotations.NotThreadSafe;
+
 
 @RunWith( ITRunner.class )
 @UseModules( TestCollectionModule.class )

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/762e43e1/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/GraphManagerFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/GraphManagerFactoryImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/GraphManagerFactoryImpl.java
index 732f0bf..7c88c33 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/GraphManagerFactoryImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/GraphManagerFactoryImpl.java
@@ -26,6 +26,8 @@ import com.google.common.cache.LoadingCache;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.google.inject.assistedinject.Assisted;
+
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.GraphFig;
 import org.apache.usergrid.persistence.graph.GraphManager;
@@ -51,29 +53,28 @@ public class GraphManagerFactoryImpl implements GraphManagerFactory {
     private final GraphFig graphFig;
     private final EdgeDeleteListener edgeDeleteListener;
     private final NodeDeleteListener nodeDeleteListener;
+    private final MetricsFactory metricsFactory;
 
     private LoadingCache<ApplicationScope, GraphManager> gmCache =
         CacheBuilder.newBuilder().maximumSize( 1000 ).build( new CacheLoader<ApplicationScope, GraphManager>() {
             public GraphManager load(
                 ApplicationScope scope ) {
-                return new GraphManagerImpl(edgeMetadataSerialization,edgeSerialization,nodeSerialization,graphFig,edgeDeleteListener,nodeDeleteListener,scope);
+                return new GraphManagerImpl(edgeMetadataSerialization,edgeSerialization,nodeSerialization,graphFig,edgeDeleteListener,nodeDeleteListener,scope, metricsFactory);
             }
         } );
 
     @Inject
-    public GraphManagerFactoryImpl(final EdgeMetadataSerialization edgeMetadataSerialization,
-                                   final EdgeSerialization edgeSerialization,
-                                   final NodeSerialization nodeSerialization,
-                                   final GraphFig graphFig,
-                                   final EdgeDeleteListener edgeDeleteListener,
-                                   final NodeDeleteListener nodeDeleteListener
-    ){
+    public GraphManagerFactoryImpl( final EdgeMetadataSerialization edgeMetadataSerialization, final
+    EdgeSerialization edgeSerialization,
+                                    final NodeSerialization nodeSerialization, final GraphFig graphFig, final EdgeDeleteListener edgeDeleteListener,
+                                    final NodeDeleteListener nodeDeleteListener, final MetricsFactory metricsFactory ){
         this.edgeMetadataSerialization = edgeMetadataSerialization;
         this.edgeSerialization = edgeSerialization;
         this.nodeSerialization = nodeSerialization;
         this.graphFig = graphFig;
         this.edgeDeleteListener = edgeDeleteListener;
         this.nodeDeleteListener = nodeDeleteListener;
+        this.metricsFactory = metricsFactory;
     }
 
     @Override