You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/02/10 19:08:24 UTC

[1/4] incubator-usergrid git commit: pushing migrations into core persistence

Repository: incubator-usergrid
Updated Branches:
  refs/heads/USERGRID-365 13aa17e4b -> 3ed0221f2 (forced update)


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationProxyImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationProxyImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationProxyImpl.java
index ce8d6cf..6059569 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationProxyImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationProxyImpl.java
@@ -25,32 +25,45 @@ package org.apache.usergrid.persistence.graph.serialization.impl;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
 
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
 import org.apache.usergrid.persistence.core.guice.V1Impl;
 import org.apache.usergrid.persistence.core.guice.V2Impl;
-import org.apache.usergrid.persistence.core.guice.V3Impl;
+import org.apache.usergrid.persistence.core.migration.data.DataMigration;
 import org.apache.usergrid.persistence.core.migration.data.DataMigrationManager;
+import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.graph.SearchEdgeType;
-import org.apache.usergrid.persistence.graph.SearchIdType;
+import org.apache.usergrid.persistence.graph.*;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
 import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
+import org.apache.usergrid.persistence.graph.serialization.EdgeMigrationStrategy;
+import org.apache.usergrid.persistence.graph.serialization.EdgesFromSourceObservable;
 import org.apache.usergrid.persistence.model.entity.Id;
 
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.netflix.astyanax.Keyspace;
 import com.netflix.astyanax.MutationBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import rx.Observable;
+import rx.functions.Action1;
+import rx.functions.Func1;
 
 
 @Singleton
-public class EdgeMetadataSerializationProxyImpl implements EdgeMetadataSerialization {
+public class EdgeMetadataSerializationProxyImpl implements EdgeMetadataSerialization, EdgeMigrationStrategy {
 
-    public static final int MIGRATION_VERSION = 2;
+    private static final Logger logger = LoggerFactory.getLogger(EdgeMetadataSerializationProxyImpl.class);
 
     private final DataMigrationManager dataMigrationManager;
     private final Keyspace keyspace;
+    private final GraphManagerFactory graphManagerFactory;
+    private final EdgesFromSourceObservable edgesFromSourceObservable;
     private final EdgeMetadataSerialization previous;
     private final EdgeMetadataSerialization current;
 
@@ -59,11 +72,15 @@ public class EdgeMetadataSerializationProxyImpl implements EdgeMetadataSerializa
      * Handles routing data to the right implementation, based on the current system migration version
      */
     @Inject
-    public EdgeMetadataSerializationProxyImpl( final DataMigrationManager dataMigrationManager, final Keyspace keyspace,
-                                               @V1Impl final EdgeMetadataSerialization previous,
-                                               @V2Impl final EdgeMetadataSerialization current) {
+    public EdgeMetadataSerializationProxyImpl(final DataMigrationManager dataMigrationManager, final Keyspace keyspace,
+                                              final GraphManagerFactory graphManagerFactory,
+                                              final EdgesFromSourceObservable edgesFromSourceObservable,
+                                              @V1Impl final EdgeMetadataSerialization previous,
+                                              @V2Impl final EdgeMetadataSerialization current) {
         this.dataMigrationManager = dataMigrationManager;
         this.keyspace = keyspace;
+        this.graphManagerFactory = graphManagerFactory;
+        this.edgesFromSourceObservable = edgesFromSourceObservable;
         this.previous = previous;
         this.current = current;
     }
@@ -274,4 +291,85 @@ public class EdgeMetadataSerializationProxyImpl implements EdgeMetadataSerializa
     private boolean isOldVersion() {
         return dataMigrationManager.getCurrentVersion() < MIGRATION_VERSION;
     }
+
+    @Override
+    public MigrationRelationship<EdgeMetadataSerialization> getMigration() {
+        return new MigrationRelationship<>(previous,current);
+    }
+
+    @Override
+    public int getVersion() {
+        return EdgeMigrationStrategy.MIGRATION_VERSION;
+    }
+
+    @Override
+    public Observable<Long> migrate(final ApplicationEntityGroup applicationEntityGroup,
+                                             final DataMigration.ProgressObserver observer) {
+        final GraphManager gm = graphManagerFactory.createEdgeManager(applicationEntityGroup.applicationScope);
+        final Observable<Edge> edgesFromSource = edgesFromSourceObservable.edgesFromSource(gm, applicationEntityGroup.applicationScope.getApplication());
+
+        final AtomicLong counter = new AtomicLong();
+        rx.Observable o =
+            Observable
+                .from(applicationEntityGroup.entityIds)
+
+            .flatMap(new Func1<Id, Observable<List<Edge>>>() {
+                //for each id in the group, get it's edges
+                @Override
+                public Observable<List<Edge>> call(final Id id) {
+                    logger.info("Migrating edges from node {} in scope {}", id,
+                        applicationEntityGroup.applicationScope);
+
+
+                    //get each edge from this node as a source
+                    return edgesFromSource
+
+                        //for each edge, re-index it in v2  every 1000 edges or less
+                        .buffer(1000)
+                        .doOnNext(new Action1<List<Edge>>() {
+                            @Override
+                            public void call(List<Edge> edges) {
+                                final MutationBatch batch =
+                                    keyspace.prepareMutationBatch();
+
+                                for (Edge edge : edges) {
+                                    logger.info("Migrating meta for edge {}", edge);
+                                    final MutationBatch edgeBatch = getMigration().to()
+                                        .writeEdge(
+                                            applicationEntityGroup
+                                                .applicationScope,
+                                            edge);
+                                    batch.mergeShallow(edgeBatch);
+                                }
+
+                                try {
+                                    batch.execute();
+                                } catch (ConnectionException e) {
+                                    throw new RuntimeException(
+                                        "Unable to perform migration", e);
+                                }
+
+                                //update the observer so the admin can see it
+                                final long newCount =
+                                    counter.addAndGet(edges.size());
+
+                                observer.update(getVersion(), String.format(
+                                    "Currently running.  Rewritten %d edge types",
+                                    newCount));
+                            }
+                        });
+                }
+
+
+            })
+            .map(new Func1<List<Edge>, Long>() {
+                @Override
+                public Long call(List<Edge> edges) {
+                    return counter.get();
+                }
+            });
+        return o;
+    }
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesFromSourceObservableImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesFromSourceObservableImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesFromSourceObservableImpl.java
new file mode 100644
index 0000000..1725f2d
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesFromSourceObservableImpl.java
@@ -0,0 +1,62 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.persistence.graph.serialization.impl;
+
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
+import org.apache.usergrid.persistence.graph.serialization.EdgesFromSourceObservable;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import rx.Observable;
+import rx.functions.Func1;
+
+/**
+ * Emits the edges that are edges from the specified source node
+ */
+public class EdgesFromSourceObservableImpl implements EdgesFromSourceObservable {
+
+    private static final Logger logger = LoggerFactory.getLogger(EdgesFromSourceObservableImpl.class);
+    public EdgesFromSourceObservableImpl(){
+
+    }
+
+    /**
+     * Get all edges from the source
+     */
+    @Override
+    public  Observable<Edge> edgesFromSource( final GraphManager gm, final Id sourceNode){
+        Observable<String> edgeTypes = gm.getEdgeTypesFromSource( new SimpleSearchEdgeType( sourceNode, null, null ) );
+
+        return edgeTypes.flatMap( new Func1<String, Observable<Edge>>() {
+            @Override
+            public Observable<Edge> call( final String edgeType ) {
+
+                logger.debug( "Loading edges of edgeType {} from {}", edgeType, sourceNode );
+
+                return gm.loadEdgesFromSource( new SimpleSearchByEdgeType( sourceNode, edgeType, Long.MAX_VALUE,
+                    SearchByEdgeType.Order.DESCENDING, null ) );
+            }
+        } );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesToTargetObservableImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesToTargetObservableImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesToTargetObservableImpl.java
new file mode 100644
index 0000000..d84b286
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesToTargetObservableImpl.java
@@ -0,0 +1,65 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.persistence.graph.serialization.impl;
+
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
+import org.apache.usergrid.persistence.graph.serialization.EdgesToTargetObservable;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import rx.Observable;
+import rx.functions.Func1;
+
+/**
+ * Classy class class.
+ */
+public class EdgesToTargetObservableImpl implements EdgesToTargetObservable {
+
+    public EdgesToTargetObservableImpl(){
+
+    }
+
+        private  final Logger logger = LoggerFactory.getLogger(EdgesToTargetObservableImpl.class);
+
+
+        /**
+         * Get all edges from the source
+         */
+        @Override
+        public  Observable<Edge> getEdgesToTarget(final GraphManager gm,  final Id targetNode) {
+            Observable<String> edgeTypes = gm.getEdgeTypesToTarget( new SimpleSearchEdgeType( targetNode, null, null ) );
+
+            return edgeTypes.flatMap( new Func1<String, Observable<Edge>>() {
+                @Override
+                public Observable<Edge> call( final String edgeType ) {
+
+                    logger.debug( "Loading edges of edgeType {} to {}", edgeType, targetNode);
+
+                    return gm.loadEdgesToTarget( new SimpleSearchByEdgeType( targetNode, edgeType, Long.MAX_VALUE,
+                        SearchByEdgeType.Order.DESCENDING, null ) );
+                }
+            } );
+        }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/GraphManagerFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/GraphManagerFactoryImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/GraphManagerFactoryImpl.java
new file mode 100644
index 0000000..7c04cee
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/GraphManagerFactoryImpl.java
@@ -0,0 +1,88 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.persistence.graph.serialization.impl;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.GraphFig;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.graph.impl.GraphManagerImpl;
+import org.apache.usergrid.persistence.graph.impl.stage.EdgeDeleteListener;
+import org.apache.usergrid.persistence.graph.impl.stage.NodeDeleteListener;
+import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
+import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
+import org.apache.usergrid.persistence.graph.serialization.NodeSerialization;
+
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Classy class class.
+ */
+public class GraphManagerFactoryImpl implements GraphManagerFactory {
+
+    private final EdgeMetadataSerialization edgeMetadataSerialization;
+    private final EdgeSerialization edgeSerialization;
+    private final NodeSerialization nodeSerialization;
+    private final GraphFig graphFig;
+    private final EdgeDeleteListener edgeDeleteListener;
+    private final NodeDeleteListener nodeDeleteListener;
+
+    private LoadingCache<ApplicationScope, GraphManager> gmCache =
+        CacheBuilder.newBuilder().maximumSize( 1000 ).build( new CacheLoader<ApplicationScope, GraphManager>() {
+            public GraphManager load(
+                ApplicationScope scope ) {
+                return new GraphManagerImpl(edgeMetadataSerialization,edgeSerialization,nodeSerialization,graphFig,edgeDeleteListener,nodeDeleteListener,scope);
+            }
+        } );
+
+    public GraphManagerFactoryImpl(final EdgeMetadataSerialization edgeMetadataSerialization,
+                                   final EdgeSerialization edgeSerialization,
+                                   final NodeSerialization nodeSerialization,
+                                   final GraphFig graphFig,
+                                   final EdgeDeleteListener edgeDeleteListener,
+                                   final NodeDeleteListener nodeDeleteListener
+    ){
+        this.edgeMetadataSerialization = edgeMetadataSerialization;
+        this.edgeSerialization = edgeSerialization;
+        this.nodeSerialization = nodeSerialization;
+        this.graphFig = graphFig;
+        this.edgeDeleteListener = edgeDeleteListener;
+        this.nodeDeleteListener = nodeDeleteListener;
+    }
+
+    @Override
+    public GraphManager createEdgeManager(ApplicationScope collectionScope) {
+        try {
+            return gmCache.get(collectionScope);
+        }catch (ExecutionException ee){
+            throw new RuntimeException(ee);
+        }
+    }
+
+    @Override
+    public void invalidate() {
+        gmCache.invalidateAll();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/TargetIdObservableImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/TargetIdObservableImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/TargetIdObservableImpl.java
new file mode 100644
index 0000000..bf0a03e
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/TargetIdObservableImpl.java
@@ -0,0 +1,70 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.persistence.graph.serialization.impl;
+
+
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.serialization.EdgesFromSourceObservable;
+import org.apache.usergrid.persistence.graph.serialization.TargetIdObservable;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import rx.Observable;
+import rx.functions.Func1;
+
+/**
+ * Emits the id of all nodes that are target nodes from the given source node
+ */
+public class TargetIdObservableImpl implements TargetIdObservable {
+
+    private static final Logger logger = LoggerFactory.getLogger(TargetIdObservableImpl.class);
+    private final EdgesFromSourceObservable edgesFromSourceObservable;
+
+    public TargetIdObservableImpl(final EdgesFromSourceObservable edgesFromSourceObservable){
+        this.edgesFromSourceObservable = edgesFromSourceObservable;
+    }
+
+    /**
+     * Get all nodes that are target nodes from the sourceNode
+     * @param gm
+     * @param sourceNode
+     *
+     * @return
+     */
+    @Override
+    public Observable<Id> getTargetNodes(final GraphManager gm, final Id sourceNode) {
+
+        //only search edge types that start with collections
+        return edgesFromSourceObservable.edgesFromSource(gm, sourceNode ).map( new Func1<Edge, Id>() {
+
+
+            @Override
+            public Id call( final Edge edge ) {
+                final Id targetNode = edge.getTargetNode();
+
+                logger.debug( "Emitting targetId of {}", edge );
+
+
+                return targetNode;
+            }
+        } );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/EdgeMetaDataSerializationProxyV1Test.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/EdgeMetaDataSerializationProxyV1Test.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/EdgeMetaDataSerializationProxyV1Test.java
index 50670b1..d6f6943 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/EdgeMetaDataSerializationProxyV1Test.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/EdgeMetaDataSerializationProxyV1Test.java
@@ -78,7 +78,7 @@ public class EdgeMetaDataSerializationProxyV1Test extends EdgeMetadataSerializat
     @Override
     protected EdgeMetadataSerialization getSerializationImpl() {
 
-        assertTrue( serialization instanceof EdgeMetadataSerializationProxyImpl );
+        assertTrue( serialization instanceof EdgeMetadataSerializationProxyImpl);
 
         return serialization;
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/EdgeMetaDataSerializationProxyV2Test.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/EdgeMetaDataSerializationProxyV2Test.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/EdgeMetaDataSerializationProxyV2Test.java
index ea03b44..69fb72d 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/EdgeMetaDataSerializationProxyV2Test.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/EdgeMetaDataSerializationProxyV2Test.java
@@ -75,7 +75,7 @@ public class EdgeMetaDataSerializationProxyV2Test extends EdgeMetadataSerializat
 
     @Override
     protected EdgeMetadataSerialization getSerializationImpl() {
-        assertTrue(serialization instanceof EdgeMetadataSerializationProxyImpl );
+        assertTrue(serialization instanceof EdgeMetadataSerializationProxyImpl);
 
         return serialization;
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java
index e6d8125..07a2bb9 100644
--- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java
+++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java
@@ -18,6 +18,8 @@
 package org.apache.usergrid.persistence.map;
 
 
+import org.apache.usergrid.persistence.core.CPManager;
+
 import java.util.UUID;
 
 
@@ -25,7 +27,7 @@ import java.util.UUID;
 /**
  * Generator of a map manager instance
  */
-public interface MapManager {
+public interface MapManager extends CPManager {
 
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManagerFactory.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManagerFactory.java
index a60cdfc..81531c9 100644
--- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManagerFactory.java
+++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManagerFactory.java
@@ -27,4 +27,6 @@ public interface MapManagerFactory {
      * Get the map manager
      */
     public MapManager createMapManager( final MapScope scope );
+
+    void invalidate();
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/guice/MapModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/guice/MapModule.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/guice/MapModule.java
index d167151..22d37c7 100644
--- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/guice/MapModule.java
+++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/guice/MapModule.java
@@ -21,6 +21,7 @@ package org.apache.usergrid.persistence.map.guice;
 import org.apache.usergrid.persistence.core.migration.schema.Migration;
 import org.apache.usergrid.persistence.map.MapManager;
 import org.apache.usergrid.persistence.map.MapManagerFactory;
+import org.apache.usergrid.persistence.map.impl.MapManagerFactoryImpl;
 import org.apache.usergrid.persistence.map.impl.MapManagerImpl;
 import org.apache.usergrid.persistence.map.impl.MapSerialization;
 import org.apache.usergrid.persistence.map.impl.MapSerializationImpl;
@@ -41,13 +42,8 @@ public class MapModule extends AbstractModule {
 
     @Override
     protected void configure() {
-
-        // create a guice factory for getting our collection manager
-        install( new FactoryModuleBuilder().implement( MapManager.class, MapManagerImpl.class )
-                                           .build( MapManagerFactory.class ) );
-
-
-        bind( MapSerialization.class).to( MapSerializationImpl.class );
+        bind(MapManagerFactory.class).to(MapManagerFactoryImpl.class);
+        bind(MapSerialization.class).to( MapSerializationImpl.class );
 
         Multibinder<Migration> migrationBinding = Multibinder.newSetBinder( binder(), Migration.class );
         migrationBinding.addBinding().to(  Key.get( MapSerialization.class ) );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerFactoryImpl.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerFactoryImpl.java
new file mode 100644
index 0000000..f5b9527
--- /dev/null
+++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerFactoryImpl.java
@@ -0,0 +1,62 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.persistence.map.impl;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.netflix.astyanax.Execution;
+import org.apache.usergrid.persistence.map.MapManager;
+import org.apache.usergrid.persistence.map.MapManagerFactory;
+import org.apache.usergrid.persistence.map.MapScope;
+
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Classy class class.
+ */
+public class MapManagerFactoryImpl implements MapManagerFactory {
+    private final MapSerialization mapSerialization;
+    private LoadingCache<MapScope, MapManager> mmCache =
+        CacheBuilder.newBuilder().maximumSize( 1000 ).build( new CacheLoader<MapScope, MapManager>() {
+            public MapManager load( MapScope scope ) {
+                return  new MapManagerImpl(scope,mapSerialization);
+            }
+        } );
+
+    public MapManagerFactoryImpl(final MapSerialization mapSerialization){
+
+        this.mapSerialization = mapSerialization;
+    }
+
+    @Override
+    public MapManager createMapManager(MapScope scope) {
+        try{
+            return mmCache.get(scope);
+        }catch (ExecutionException ee){
+            throw new RuntimeException(ee);
+        }
+    }
+
+    @Override
+    public void invalidate() {
+        mmCache.invalidateAll();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
index e221f0e..4222253 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
@@ -21,6 +21,7 @@ package org.apache.usergrid.persistence.index;
 
 import java.util.UUID;
 
+import org.apache.usergrid.persistence.core.CPManager;
 import org.apache.usergrid.persistence.core.util.Health;
 import org.apache.usergrid.persistence.index.query.Query;
 import org.apache.usergrid.persistence.index.query.CandidateResults;
@@ -32,10 +33,10 @@ import java.util.Map;
 /**
  * Provides indexing of Entities within a scope.
  */
-public interface EntityIndex {
+public interface EntityIndex extends CPManager {
 
     /**
-     * This should ONLY ever be called once on application create.  
+     * This should ONLY ever be called once on application create.
      * Otherwise we're introducing slowness into our system
      */
     public void initializeIndex();
@@ -95,12 +96,12 @@ public interface EntityIndex {
      * Check health of cluster.
      */
     public Health getClusterHealth();
-    
+
     /**
      * Check health of this specific index.
      */
     public Health getIndexHealth();
-    
+
     public void deleteIndex();
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexFactory.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexFactory.java
index 78a5137..10752d1 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexFactory.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexFactory.java
@@ -25,6 +25,8 @@ import com.google.inject.assistedinject.Assisted;
 
 public interface EntityIndexFactory {
 
-    public EntityIndex createEntityIndex( 
+    public EntityIndex createEntityIndex(
         @Assisted ApplicationScope appScope);
+
+    void invalidate();
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
index edc938b..45813e1 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
@@ -24,6 +24,7 @@ import com.google.inject.AbstractModule;
 import com.google.inject.assistedinject.FactoryModuleBuilder;
 import org.apache.usergrid.persistence.index.EntityIndex;
 import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.apache.usergrid.persistence.index.impl.EntityIndexFactoryImpl;
 import org.apache.usergrid.persistence.index.impl.EsEntityIndexImpl;
 import org.safehaus.guicyfig.GuicyFigModule;
 
@@ -36,9 +37,8 @@ public class IndexModule extends AbstractModule {
         // install our configuration
         install (new GuicyFigModule( IndexFig.class ));
 
-        install( new FactoryModuleBuilder()
-            .implement( EntityIndex.class, EsEntityIndexImpl.class )
-            .build( EntityIndexFactory.class ) );
+        bind(EntityIndexFactory.class).to(EntityIndexFactoryImpl.class);
+        bind(EntityIndex.class).to(EsEntityIndexImpl.class);
 
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EntityIndexFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EntityIndexFactoryImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EntityIndexFactoryImpl.java
new file mode 100644
index 0000000..e45c6c5
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EntityIndexFactoryImpl.java
@@ -0,0 +1,67 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.persistence.index.impl;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.inject.assistedinject.Assisted;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.index.EntityIndex;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.apache.usergrid.persistence.index.IndexFig;
+
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Classy class class.
+ */
+public class EntityIndexFactoryImpl implements EntityIndexFactory{
+
+    private final IndexFig config;
+    private final EsProvider provider;
+    private final EsIndexCache indexCache;
+
+    private LoadingCache<ApplicationScope, EntityIndex> eiCache =
+        CacheBuilder.newBuilder().maximumSize( 1000 ).build( new CacheLoader<ApplicationScope, EntityIndex>() {
+            public EntityIndex load( ApplicationScope scope ) {
+                return new EsEntityIndexImpl(scope,config,provider,indexCache);
+            }
+        } );
+    public EntityIndexFactoryImpl(final IndexFig config, final EsProvider provider, final EsIndexCache indexCache){
+        this.config = config;
+        this.provider = provider;
+        this.indexCache = indexCache;
+    }
+
+    @Override
+    public EntityIndex createEntityIndex(final ApplicationScope appScope) {
+        try{
+            return eiCache.get(appScope);
+        }catch (ExecutionException ee){
+            throw new RuntimeException(ee);
+        }
+    }
+
+    @Override
+    public void invalidate() {
+        eiCache.invalidateAll();
+    }
+}


[4/4] incubator-usergrid git commit: pushing migrations into core persistence

Posted by sf...@apache.org.
pushing migrations into core persistence

pushing migrations into core persistence

moving dependencies to core persistence

moving dependencies to core persistence

moving dependencies to core persistence


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/3ed0221f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/3ed0221f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/3ed0221f

Branch: refs/heads/USERGRID-365
Commit: 3ed0221f2c7fb95b827aff7a9b3827ffa045008e
Parents: 0f0c557
Author: Shawn Feldman <sf...@apache.org>
Authored: Thu Feb 5 17:55:38 2015 -0700
Committer: Shawn Feldman <sf...@apache.org>
Committed: Tue Feb 10 11:07:23 2015 -0700

----------------------------------------------------------------------
 .../corepersistence/CpEntityManagerFactory.java |  22 +--
 .../corepersistence/CpManagerCache.java         |  71 ++-------
 .../usergrid/corepersistence/GuiceModule.java   |  21 ++-
 .../usergrid/corepersistence/ManagerCache.java  |  12 +-
 .../migration/EntityDataMigration.java          | 146 ------------------
 .../migration/EntityTypeMappingMigration.java   |  60 ++++----
 .../migration/GraphShardVersionMigration.java   | 153 -------------------
 .../corepersistence/migration/Versions.java     |  19 ++-
 .../rx/AllEntitiesInSystemObservable.java       | 101 ------------
 .../rx/ApplicationObservable.java               | 128 ----------------
 .../rx/EdgesFromSourceObservable.java           |  63 --------
 .../rx/EdgesToTargetObservable.java             |  63 --------
 .../corepersistence/rx/TargetIdObservable.java  |  66 --------
 .../impl/AllEntitiesInSystemObservableImpl.java | 106 +++++++++++++
 .../rx/impl/ApplicationObservableImpl.java      | 135 ++++++++++++++++
 .../corepersistence/util/CpNamingUtils.java     |   2 +
 .../migration/EntityDataMigrationIT.java        |  61 +++++---
 .../migration/EntityTypeMappingMigrationIT.java |  88 ++++++-----
 .../migration/GraphShardVersionMigrationIT.java |  42 +++--
 .../rx/AllEntitiesInSystemObservableIT.java     |  14 +-
 .../rx/ApplicationObservableTestIT.java         |   7 +-
 .../rx/EdgesFromSourceObservableIT.java         |   4 +-
 .../rx/EdgesToTargetObservableIT.java           |   6 +-
 .../rx/TargetIdObservableTestIT.java            |   7 +-
 .../collection/EntityCollectionManager.java     |   8 +-
 .../EntityCollectionManagerFactory.java         |  22 +--
 .../collection/guice/CollectionModule.java      |   8 +-
 .../EntityCollectionManagerFactoryImpl.java     | 137 +++++++++++++++++
 .../impl/EntityCollectionManagerImpl.java       |   2 +-
 .../collection/impl/EntityDeletedTask.java      |  14 +-
 .../impl/EntityVersionCleanupTask.java          |   8 +-
 .../mvcc/MvccEntityMigrationStrategy.java       |  30 ++++
 .../mvcc/MvccEntitySerializationStrategy.java   | 102 -------------
 .../mvcc/stage/delete/MarkCommit.java           |   2 +-
 .../mvcc/stage/write/WriteCommit.java           |   4 +-
 .../serialization/impl/EntityRepairImpl.java    |   1 -
 .../impl/MvccEntitySerializationStrategy.java   | 101 ++++++++++++
 .../MvccEntitySerializationStrategyImpl.java    |   1 -
 .../MvccEntitySerializationStrategyProxy.java   |  98 ++++++++++--
 ...cEntitySerializationStrategyProxyV1Impl.java |  32 ++--
 ...cEntitySerializationStrategyProxyV2Impl.java |  27 ++--
 .../MvccEntitySerializationStrategyV3Impl.java  |   8 -
 .../serialization/impl/SerializationModule.java |  23 +--
 .../impl/EntityVersionCleanupTaskTest.java      |  53 ++++---
 .../mvcc/stage/delete/MarkCommitTest.java       |   2 +-
 .../mvcc/stage/write/WriteCommitTest.java       |   2 +-
 .../serialization/EntityRepairImplTest.java     |   2 +-
 ...MvccEntitySerializationStrategyImplTest.java |   3 -
 ...ntitySerializationStrategyProxyV1_2Test.java |   4 +-
 ...ntitySerializationStrategyProxyV1_3Test.java |   5 +-
 ...cEntitySerializationStrategyProxyV2Test.java |   5 +-
 ...ccEntitySerializationStrategyV1ImplTest.java |   4 -
 ...ccEntitySerializationStrategyV2ImplTest.java |   5 -
 ...ccEntitySerializationStrategyV3ImplTest.java |   4 -
 .../MvccEntitySerializationStrategyV3Test.java  |   1 -
 .../usergrid/persistence/core/CPManager.java    |  28 ++++
 .../persistence/core/guice/CommonModule.java    |   1 -
 .../persistence/core/guice/CurrentImpl.java     |  35 -----
 .../persistence/core/guice/PreviousImpl.java    |  35 -----
 .../usergrid/persistence/core/guice/V2Impl.java |   4 +-
 .../usergrid/persistence/core/guice/V3Impl.java |   4 +-
 .../core/migration/data/DataMigration.java      |   5 +-
 .../data/DataMigrationManagerImpl.java          |  97 +++++++-----
 .../migration/schema/MigrationStrategy.java     |  58 +++++++
 .../core/rx/AllEntitiesInSystemObservable.java  |  40 +++++
 .../core/rx/ApplicationObservable.java          |  33 ++++
 .../core/scope/ApplicationEntityGroup.java      |  40 +++++
 .../core/scope/ApplicationScope.java            |   1 +
 .../core/guice/MaxMigrationVersion.java         |   6 +-
 .../data/DataMigrationManagerImplTest.java      |  38 +++--
 .../persistence/graph/GraphManager.java         |   3 +-
 .../persistence/graph/GraphManagerFactory.java  |   2 +
 .../persistence/graph/guice/GraphModule.java    |  32 ++--
 .../serialization/EdgeMigrationStrategy.java    |  33 ++++
 .../EdgesFromSourceObservable.java              |  32 ++++
 .../serialization/EdgesToTargetObservable.java  |  32 ++++
 .../graph/serialization/TargetIdObservable.java |  38 +++++
 .../EdgeMetadataSerializationProxyImpl.java     | 116 ++++++++++++--
 .../impl/EdgesFromSourceObservableImpl.java     |  62 ++++++++
 .../impl/EdgesToTargetObservableImpl.java       |  65 ++++++++
 .../impl/GraphManagerFactoryImpl.java           |  88 +++++++++++
 .../impl/TargetIdObservableImpl.java            |  70 +++++++++
 .../EdgeMetaDataSerializationProxyV1Test.java   |   2 +-
 .../EdgeMetaDataSerializationProxyV2Test.java   |   2 +-
 .../usergrid/persistence/map/MapManager.java    |   4 +-
 .../persistence/map/MapManagerFactory.java      |   2 +
 .../persistence/map/guice/MapModule.java        |  10 +-
 .../map/impl/MapManagerFactoryImpl.java         |  62 ++++++++
 .../usergrid/persistence/index/EntityIndex.java |   9 +-
 .../persistence/index/EntityIndexFactory.java   |   4 +-
 .../persistence/index/guice/IndexModule.java    |   6 +-
 .../index/impl/EntityIndexFactoryImpl.java      |  67 ++++++++
 92 files changed, 1929 insertions(+), 1352 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index 93eb15e..0ca98e6 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -35,7 +35,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.lang.StringUtils;
 
-import org.apache.usergrid.corepersistence.rx.AllEntitiesInSystemObservable;
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.persistence.AbstractEntity;
 import org.apache.usergrid.persistence.Entity;
@@ -54,6 +53,7 @@ import org.apache.usergrid.persistence.collection.EntityCollectionManager;
 import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
 import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl;
 import org.apache.usergrid.persistence.core.migration.data.DataMigrationManager;
+import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
 import org.apache.usergrid.persistence.core.util.Health;
@@ -61,14 +61,10 @@ import org.apache.usergrid.persistence.entities.Application;
 import org.apache.usergrid.persistence.exceptions.ApplicationAlreadyExistsException;
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.GraphManager;
-import org.apache.usergrid.persistence.graph.GraphManagerFactory;
 import org.apache.usergrid.persistence.graph.SearchByEdgeType;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
-import org.apache.usergrid.persistence.index.AliasedEntityIndex;
 import org.apache.usergrid.persistence.index.EntityIndex;
-import org.apache.usergrid.persistence.index.EntityIndexFactory;
 import org.apache.usergrid.persistence.index.query.Query;
-import org.apache.usergrid.persistence.map.MapManagerFactory;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
 import org.apache.usergrid.utils.UUIDUtils;
@@ -110,8 +106,10 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
             }
         });
 
-
     private ManagerCache managerCache;
+
+    private AllEntitiesInSystemObservable allEntitiesInSystemObservable;
+
     private DataMigrationManager dataMigrationManager;
 
     CassandraService cass;
@@ -161,6 +159,12 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
         return managerCache;
     }
 
+    private AllEntitiesInSystemObservable getAllEntitiesObservable(){
+        if(allEntitiesInSystemObservable==null)
+            allEntitiesInSystemObservable = CpSetup.getInjector().getInstance(AllEntitiesInSystemObservable.class);
+        return allEntitiesInSystemObservable;
+    }
+
 
     @Override
     public String getImplementationDescription() throws Exception {
@@ -426,8 +430,8 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
             if ( e == null ) {
                 logger.warn("Applicaion {} in index but not found in collections", targetId );
                 continue;
-            } 
-            
+            }
+
             appMap.put(
                 (String)e.getField( PROPERTY_NAME ).getValue(),
                 (UUID)e.getField( "applicationUuid" ).getValue());
@@ -576,7 +580,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
     public long performEntityCount() {
         //TODO, this really needs to be a task that writes this data somewhere since this will get
         //progressively slower as the system expands
-        return AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 ).longCount().toBlocking().last();
+        return getAllEntitiesObservable().getAllEntitiesInSystem(1000).longCount().toBlocking().last();
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java
index e29e5c2..8292170 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java
@@ -18,6 +18,7 @@ package org.apache.usergrid.corepersistence;
 
 import java.util.concurrent.ExecutionException;
 
+import com.amazonaws.services.cloudfront.model.InvalidArgumentException;
 import org.apache.usergrid.persistence.collection.CollectionScope;
 import org.apache.usergrid.persistence.collection.EntityCollectionManager;
 import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
@@ -45,35 +46,8 @@ public class CpManagerCache implements ManagerCache {
 
     // TODO: consider making these cache sizes and timeouts configurable
 
-    private LoadingCache<CollectionScope, EntityCollectionManager> ecmCache =
-            CacheBuilder.newBuilder().maximumSize( 1000 )
-                        .build( new CacheLoader<CollectionScope, EntityCollectionManager>() {
-                                    public EntityCollectionManager load( CollectionScope scope ) {
-                                        return ecmf.createCollectionManager( scope );
-                                    }
-                                } );
-
-    private LoadingCache<ApplicationScope, EntityIndex> eiCache =
-            CacheBuilder.newBuilder().maximumSize( 1000 ).build( new CacheLoader<ApplicationScope, EntityIndex>() {
-                                                                     public EntityIndex load( ApplicationScope scope ) {
-                                                                         return eif.createEntityIndex( scope );
-                                                                     }
-                                                                 } );
-
-    private LoadingCache<ApplicationScope, GraphManager> gmCache =
-            CacheBuilder.newBuilder().maximumSize( 1000 ).build( new CacheLoader<ApplicationScope, GraphManager>() {
-                                                                     public GraphManager load(
-                                                                             ApplicationScope scope ) {
-                                                                         return gmf.createEdgeManager( scope );
-                                                                     }
-                                                                 } );
-
-    private LoadingCache<MapScope, MapManager> mmCache =
-            CacheBuilder.newBuilder().maximumSize( 1000 ).build( new CacheLoader<MapScope, MapManager>() {
-                                                                     public MapManager load( MapScope scope ) {
-                                                                         return mmf.createMapManager( scope );
-                                                                     }
-                                                                 } );
+
+
 
 
     @Inject
@@ -89,53 +63,34 @@ public class CpManagerCache implements ManagerCache {
 
     @Override
     public EntityCollectionManager getEntityCollectionManager( CollectionScope scope ) {
-        try {
-            return ecmCache.get( scope );
-        }
-        catch ( ExecutionException ex ) {
-            throw new RuntimeException( "Error getting manager", ex );
-        }
+        return ecmf.createCollectionManager(scope);
     }
 
 
     @Override
     public EntityIndex getEntityIndex( ApplicationScope appScope ) {
-        try {
-            return eiCache.get( appScope );
-        }
-        catch ( ExecutionException ex ) {
-            throw new RuntimeException( "Error getting manager", ex );
-        }
+
+            return eif.createEntityIndex( appScope );
+
     }
 
 
     @Override
     public GraphManager getGraphManager( ApplicationScope appScope ) {
-        try {
-            return gmCache.get( appScope );
-        }
-        catch ( ExecutionException ex ) {
-            throw new RuntimeException( "Error getting manager", ex );
-        }
+        return gmf.createEdgeManager(appScope);
     }
 
 
     @Override
     public MapManager getMapManager( MapScope mapScope ) {
-        try {
-            return mmCache.get( mapScope );
-        }
-        catch ( ExecutionException ex ) {
-            throw new RuntimeException( "Error getting manager", ex );
-        }
+        return mmf.createMapManager(mapScope);
     }
 
-
     @Override
     public void invalidate() {
-        ecmCache.invalidateAll();
-        eiCache.invalidateAll();
-        gmCache.invalidateAll();
-        mmCache.invalidateAll();
+        ecmf.invalidate();
+        eif.invalidate();
+        gmf.invalidate();
+        mmf.invalidate();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
index ed6cba2..b58a246 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
@@ -18,20 +18,26 @@ package org.apache.usergrid.corepersistence;
 import com.google.inject.AbstractModule;
 import com.google.inject.multibindings.Multibinder;
 
-import org.apache.usergrid.corepersistence.migration.EntityDataMigration;
 import org.apache.usergrid.corepersistence.migration.EntityTypeMappingMigration;
-import org.apache.usergrid.corepersistence.migration.GraphShardVersionMigration;
 import org.apache.usergrid.corepersistence.events.EntityDeletedHandler;
 import org.apache.usergrid.corepersistence.events.EntityVersionCreatedHandler;
 import org.apache.usergrid.corepersistence.events.EntityVersionDeletedHandler;
+import org.apache.usergrid.corepersistence.rx.impl.AllEntitiesInSystemObservableImpl;
+import org.apache.usergrid.corepersistence.rx.impl.ApplicationObservableImpl;
 import org.apache.usergrid.persistence.EntityManagerFactory;
 import org.apache.usergrid.persistence.collection.event.EntityDeleted;
 import org.apache.usergrid.persistence.collection.event.EntityVersionCreated;
 import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
 import org.apache.usergrid.persistence.collection.guice.CollectionModule;
+import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategyProxy;
 import org.apache.usergrid.persistence.core.guice.CommonModule;
+import org.apache.usergrid.persistence.core.guice.ProxyImpl;
 import org.apache.usergrid.persistence.core.migration.data.DataMigration;
+import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
+import org.apache.usergrid.persistence.core.rx.ApplicationObservable;
 import org.apache.usergrid.persistence.graph.guice.GraphModule;
+import org.apache.usergrid.persistence.graph.serialization.impl.EdgeMetadataSerializationProxyImpl;
 import org.apache.usergrid.persistence.index.guice.IndexModule;
 import org.apache.usergrid.persistence.map.guice.MapModule;
 import org.apache.usergrid.persistence.queue.guice.QueueModule;
@@ -69,18 +75,19 @@ public class GuiceModule extends AbstractModule {
         install(new QueueModule());
 
         bind(ManagerCache.class).to( CpManagerCache.class );
+        bind(AllEntitiesInSystemObservable.class).to( AllEntitiesInSystemObservableImpl.class );
+        bind(ApplicationObservable.class).to( ApplicationObservableImpl.class );
 
-        Multibinder<DataMigration> dataMigrationMultibinder = 
+
+        Multibinder<DataMigration> dataMigrationMultibinder =
                 Multibinder.newSetBinder( binder(), DataMigration.class );
         dataMigrationMultibinder.addBinding().to( EntityTypeMappingMigration.class );
-        dataMigrationMultibinder.addBinding().to( GraphShardVersionMigration.class );
-        dataMigrationMultibinder.addBinding().to( EntityDataMigration.class );
 
-        Multibinder<EntityDeleted> entityBinder = 
+        Multibinder<EntityDeleted> entityBinder =
             Multibinder.newSetBinder(binder(), EntityDeleted.class);
         entityBinder.addBinding().to(EntityDeletedHandler.class);
 
-        Multibinder<EntityVersionDeleted> versionBinder = 
+        Multibinder<EntityVersionDeleted> versionBinder =
             Multibinder.newSetBinder(binder(), EntityVersionDeleted.class);
         versionBinder.addBinding().to(EntityVersionDeletedHandler.class);
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/core/src/main/java/org/apache/usergrid/corepersistence/ManagerCache.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/ManagerCache.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/ManagerCache.java
index c1b7b95..921fde5 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/ManagerCache.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/ManagerCache.java
@@ -33,6 +33,7 @@ import org.apache.usergrid.persistence.map.MapScope;
  * The cache of the manager
  */
 public interface ManagerCache {
+
     /**
      * Get the entity collection manager for the specified scope
      * @param scope
@@ -42,27 +43,30 @@ public interface ManagerCache {
 
     /**
      * Get the entity index for the specified app scope
+     *
      * @param appScope
      * @return
      */
-    EntityIndex getEntityIndex( ApplicationScope appScope );
+    EntityIndex getEntityIndex(ApplicationScope appScope);
 
     /**
      * Get the graph manager for the graph scope
+     *
      * @param appScope
      * @return
      */
-    GraphManager getGraphManager( ApplicationScope appScope );
+    GraphManager getGraphManager(ApplicationScope appScope);
 
     /**
      * Get the map manager for the map scope
+     *
      * @param mapScope
      * @return
      */
-    MapManager getMapManager( MapScope mapScope );
+    MapManager getMapManager(MapScope mapScope);
 
     /**
-     * Invalidate all cache entries
+     * invalidate the cache
      */
     void invalidate();
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityDataMigration.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityDataMigration.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityDataMigration.java
deleted file mode 100644
index 847d717..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityDataMigration.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.migration;
-
-
-import java.util.Iterator;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.usergrid.corepersistence.ManagerCache;
-import org.apache.usergrid.corepersistence.rx.AllEntitiesInSystemObservable;
-import org.apache.usergrid.corepersistence.util.CpNamingUtils;
-import org.apache.usergrid.persistence.collection.CollectionScope;
-import org.apache.usergrid.persistence.collection.MvccEntity;
-import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
-import org.apache.usergrid.persistence.core.guice.*;
-import org.apache.usergrid.persistence.core.migration.data.DataMigration;
-import org.apache.usergrid.persistence.core.migration.data.DataMigrationException;
-import org.apache.usergrid.persistence.model.entity.Id;
-import org.apache.usergrid.persistence.model.util.UUIDGenerator;
-
-import com.google.inject.Inject;
-import com.netflix.astyanax.Keyspace;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-
-import rx.functions.Action1;
-
-
-/**
- * Migration for migrating graph edges to the new Shards
- */
-public class EntityDataMigration implements DataMigration {
-
-
-    private final MvccEntitySerializationStrategy v1Serialization;
-    private final MvccEntitySerializationStrategy v2Serialization;
-
-    private final ManagerCache managerCache;
-    private final Keyspace keyspace;
-
-
-    @Inject
-    public EntityDataMigration( @PreviousImpl final MvccEntitySerializationStrategy v1Serialization,
-                                @CurrentImpl final MvccEntitySerializationStrategy v2Serialization,
-                                final ManagerCache managerCache, final Keyspace keyspace ) {
-        this.v1Serialization = v1Serialization;
-        this.v2Serialization = v2Serialization;
-        this.managerCache = managerCache;
-        this.keyspace = keyspace;
-    }
-
-
-    @Override
-    public void migrate( final ProgressObserver observer ) throws Throwable {
-
-        final AtomicLong atomicLong = new AtomicLong();
-
-        AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 )
-                                     .doOnNext( new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() {
-
-
-                                                 @Override
-                                                 public void call(
-                                                         final AllEntitiesInSystemObservable.ApplicationEntityGroup
-                                                                 applicationEntityGroup ) {
-
-
-                                                     final UUID now = UUIDGenerator.newTimeUUID();
-
-                                                     final Id appScopeId =
-                                                             applicationEntityGroup.applicationScope.getApplication();
-
-
-                                                     final MutationBatch totalBatch = keyspace.prepareMutationBatch();
-
-                                                     //go through each entity in the system, and load it's entire
-                                                     // history
-                                                     for ( Id entityId : applicationEntityGroup.entityIds ) {
-
-                                                         CollectionScope currentScope = CpNamingUtils
-                                                                 .getCollectionScopeNameFromEntityType( appScopeId,
-                                                                         entityId.getType() );
-
-
-                                                         //for each element in the history in the previous version,
-                                                         // copy it to the CF in v2
-                                                         Iterator<MvccEntity> allVersions = v1Serialization
-                                                                 .loadDescendingHistory( currentScope, entityId, now,
-                                                                         1000 );
-
-                                                         while ( allVersions.hasNext() ) {
-                                                             final MvccEntity version = allVersions.next();
-
-                                                             final MutationBatch versionBatch =
-                                                                     v2Serialization.write( currentScope, version );
-
-                                                             totalBatch.mergeShallow( versionBatch );
-
-                                                             if ( atomicLong.incrementAndGet() % 50 == 0 ) {
-                                                                 executeBatch( totalBatch, observer, atomicLong );
-                                                             }
-                                                         }
-                                                     }
-
-                                                     executeBatch( totalBatch, observer, atomicLong );
-                                                 }
-                                             } ).toBlocking().last();
-    }
-
-
-    private void executeBatch( final MutationBatch batch, final ProgressObserver po, final AtomicLong count ) {
-        try {
-            batch.execute();
-
-            po.update( getVersion(), "Finished copying " + count + " entities to the new format" );
-        }
-        catch ( ConnectionException e ) {
-            po.failed( getVersion(), "Failed to execute mutation in cassandra" );
-            throw new DataMigrationException( "Unable to migrate batches ", e );
-        }
-    }
-
-
-    @Override
-    public int getVersion() {
-        return Versions.VERSION_3;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java
index 8089dfd..8352cff 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java
@@ -23,20 +23,23 @@ package org.apache.usergrid.corepersistence.migration;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.corepersistence.ManagerCache;
-import org.apache.usergrid.corepersistence.rx.AllEntitiesInSystemObservable;
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
 import org.apache.usergrid.persistence.core.migration.data.DataMigration;
+import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
 import org.apache.usergrid.persistence.map.MapManager;
 import org.apache.usergrid.persistence.map.MapScope;
 import org.apache.usergrid.persistence.model.entity.Id;
 
 import com.google.inject.Inject;
 
+import rx.Observable;
+import rx.Scheduler;
 import rx.functions.Action1;
+import rx.functions.Func1;
+import rx.schedulers.Schedulers;
 
 
 /**
@@ -45,44 +48,41 @@ import rx.functions.Action1;
 public class EntityTypeMappingMigration implements DataMigration {
 
     private final ManagerCache managerCache;
-
+    private final AllEntitiesInSystemObservable allEntitiesInSystemObservable;
 
 
     @Inject
-    public EntityTypeMappingMigration( final ManagerCache managerCache) {
+    public EntityTypeMappingMigration( final ManagerCache managerCache, final AllEntitiesInSystemObservable allEntitiesInSystemObservable) {
        this.managerCache = managerCache;
+        this.allEntitiesInSystemObservable = allEntitiesInSystemObservable;
     }
 
 
     @Override
-    public void migrate( final ProgressObserver observer ) throws Throwable {
+    public Observable migrate(final ApplicationEntityGroup applicationEntityGroup, final ProgressObserver observer) throws Throwable {
 
         final AtomicLong atomicLong = new AtomicLong();
 
-        AllEntitiesInSystemObservable.getAllEntitiesInSystem(managerCache, 1000 )
-                                     .doOnNext( new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() {
-
-
-                                         @Override
-                                         public void call( final AllEntitiesInSystemObservable.ApplicationEntityGroup applicationEntityGroup ) {
-
-                                             final MapScope ms = CpNamingUtils.getEntityTypeMapScope( applicationEntityGroup.applicationScope.getApplication() );
-
-
-                                             final MapManager mapManager = managerCache.getMapManager( ms );
-
-                                             for(Id entityId: applicationEntityGroup.entityIds) {
-                                                 final UUID entityUuid = entityId.getUuid();
-                                                 final String entityType = entityId.getType();
-
-                                                 mapManager.putString( entityUuid.toString(), entityType );
-
-                                                 if ( atomicLong.incrementAndGet() % 100 == 0 ) {
-                                                     updateStatus( atomicLong, observer );
-                                                 }
-                                             }
-                                         }
-                                     } ).toBlocking().lastOrDefault( null );
+        final MapScope ms = CpNamingUtils.getEntityTypeMapScope(applicationEntityGroup.applicationScope.getApplication());
+
+        final MapManager mapManager = managerCache.getMapManager(ms);
+        return Observable.from(applicationEntityGroup.entityIds).subscribeOn(Schedulers.io())
+            .map(new Func1<Id, Long>() {
+                @Override
+                public Long call(Id id) {
+                    for (Id entityId : applicationEntityGroup.entityIds) {
+                        final UUID entityUuid = entityId.getUuid();
+                        final String entityType = entityId.getType();
+
+                        mapManager.putString(entityUuid.toString(), entityType);
+
+                        if (atomicLong.incrementAndGet() % 100 == 0) {
+                            updateStatus(atomicLong, observer);
+                        }
+                    }
+                    return atomicLong.get();
+                }
+            });
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigration.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigration.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigration.java
deleted file mode 100644
index 21f945b..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigration.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- *
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*    http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
- *
- */
-
-package org.apache.usergrid.corepersistence.migration;
-
-
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.usergrid.persistence.core.guice.CurrentImpl;
-import org.apache.usergrid.persistence.core.guice.V2Impl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.corepersistence.ManagerCache;
-import org.apache.usergrid.corepersistence.rx.AllEntitiesInSystemObservable;
-import org.apache.usergrid.corepersistence.rx.EdgesFromSourceObservable;
-import org.apache.usergrid.persistence.core.migration.data.DataMigration;
-import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.graph.GraphManager;
-import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.inject.Inject;
-import com.netflix.astyanax.Keyspace;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-
-import rx.Observable;
-import rx.functions.Action1;
-import rx.functions.Func1;
-
-
-/**
- * Migration for migrating graph edges to the new Shards
- */
-public class GraphShardVersionMigration implements DataMigration {
-
-
-    private static final Logger logger = LoggerFactory.getLogger( GraphShardVersionMigration.class );
-
-
-    private final EdgeMetadataSerialization v2Serialization;
-
-    private final ManagerCache managerCache;
-    private final Keyspace keyspace;
-
-
-    @Inject
-    public GraphShardVersionMigration( @CurrentImpl final EdgeMetadataSerialization v2Serialization,
-                                       final ManagerCache managerCache, final Keyspace keyspace ) {
-        this.v2Serialization = v2Serialization;
-        this.managerCache = managerCache;
-        this.keyspace = keyspace;
-    }
-
-
-    @Override
-    public void migrate( final ProgressObserver observer ) throws Throwable {
-
-        final AtomicLong counter = new AtomicLong();
-
-        AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 ).flatMap(
-                new Func1<AllEntitiesInSystemObservable.ApplicationEntityGroup, Observable<List<Edge>>>() {
-
-
-                    @Override
-                    public Observable<List<Edge>> call(
-                            final AllEntitiesInSystemObservable.ApplicationEntityGroup applicationEntityGroup ) {
-
-                        //emit a stream of all ids from this group
-                        return Observable.from( applicationEntityGroup.entityIds )
-                                         .flatMap( new Func1<Id, Observable<List<Edge>>>() {
-
-
-                                             //for each id in the group, get it's edges
-                                             @Override
-                                             public Observable<List<Edge>> call( final Id id ) {
-                                                 logger.info( "Migrating edges from node {} in scope {}", id,
-                                                         applicationEntityGroup.applicationScope );
-
-                                                 final GraphManager gm = managerCache
-                                                         .getGraphManager( applicationEntityGroup.applicationScope );
-
-                                                 //get each edge from this node as a source
-                                                 return EdgesFromSourceObservable.edgesFromSource( gm, id )
-
-                                                         //for each edge, re-index it in v2  every 1000 edges or less
-                                                         .buffer( 1000 ).doOnNext( new Action1<List<Edge>>() {
-                                                             @Override
-                                                             public void call( final List<Edge> edges ) {
-
-                                                                 final MutationBatch batch =
-                                                                         keyspace.prepareMutationBatch();
-
-                                                                 for ( final Edge edge : edges ) {
-                                                                     logger.info( "Migrating meta for edge {}", edge );
-                                                                     final MutationBatch edgeBatch = v2Serialization
-                                                                             .writeEdge(
-                                                                                     applicationEntityGroup
-                                                                                             .applicationScope,
-                                                                                     edge );
-                                                                     batch.mergeShallow( edgeBatch );
-                                                                 }
-
-                                                                 try {
-                                                                     batch.execute();
-                                                                 }
-                                                                 catch ( ConnectionException e ) {
-                                                                     throw new RuntimeException(
-                                                                             "Unable to perform migration", e );
-                                                                 }
-
-                                                                 //update the observer so the admin can see it
-                                                                 final long newCount =
-                                                                         counter.addAndGet( edges.size() );
-
-                                                                 observer.update( getVersion(), String.format(
-                                                                         "Currently running.  Rewritten %d edge types",
-                                                                         newCount ) );
-                                                             }
-                                                         } );
-                                             }
-                                         } );
-                    }
-                } ).toBlocking().lastOrDefault( null );
-        ;
-    }
-
-
-    @Override
-    public int getVersion() {
-        return Versions.VERSION_2;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/Versions.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/Versions.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/Versions.java
index 636ff53..7d05733 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/Versions.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/Versions.java
@@ -22,10 +22,9 @@
 package org.apache.usergrid.corepersistence.migration;
 
 
-import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategyProxy;
-import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategyProxyV1Impl;
-import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategyProxyV2Impl;
-import org.apache.usergrid.persistence.graph.serialization.impl.EdgeMetadataSerializationProxyImpl;
+import org.apache.usergrid.persistence.core.guice.V2Impl;
+import org.apache.usergrid.persistence.core.guice.V3Impl;
+import org.apache.usergrid.persistence.graph.serialization.EdgeMigrationStrategy;
 
 
 /**
@@ -41,10 +40,16 @@ public class Versions {
     /**
      * Version 2.  Edge meta changes
      */
-    public static final int VERSION_2 = EdgeMetadataSerializationProxyImpl.MIGRATION_VERSION;
+    public static final int VERSION_2 = EdgeMigrationStrategy.MIGRATION_VERSION;
 
-    public static final int VERSION_3 = MvccEntitySerializationStrategyProxyV1Impl.MIGRATION_VERSION;
+    /**
+     * Version 3. migrate from entity serialization 1 -> 2
+     */
+    public static final int VERSION_3 = V2Impl.MIGRATION_VERSION;
 
-    public static final int VERSION_4 = MvccEntitySerializationStrategyProxyV2Impl.MIGRATION_VERSION;
+    /**
+     * Version 4. migrate from entity serialization 1 -> 2
+     */
+    public static final int VERSION_4 = V3Impl.MIGRATION_VERSION;
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservable.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservable.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservable.java
deleted file mode 100644
index 771b81f..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservable.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.rx;
-
-
-import java.util.List;
-
-import org.apache.usergrid.corepersistence.ManagerCache;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
-import org.apache.usergrid.persistence.graph.GraphManager;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import rx.Observable;
-import rx.functions.Func1;
-
-
-/**
- * An observable that will emit every entity Id stored in our entire system across all apps.
- * Note that this only walks each application applicationId graph, and emits edges from the applicationId and it's edges as the s
- * source node
- */
-public class AllEntitiesInSystemObservable {
-
-
-    /**
-     * Return an observable that emits all entities in the system.
-     * @param managerCache the managerCache to use
-     * @param bufferSize The amount of entityIds to buffer into each ApplicationEntityGroup.  Note that if we exceed the buffer size
-     * you may be more than 1 ApplicationEntityGroup with the same application and different ids
-     */
-    public static Observable<ApplicationEntityGroup> getAllEntitiesInSystem( final ManagerCache managerCache, final int bufferSize) {
-        //traverse all nodes in the graph, load all source edges from them, then re-save the meta data
-        return ApplicationObservable.getAllApplicationIds( managerCache )
-
-                                    .flatMap( new Func1<Id, Observable<ApplicationEntityGroup>>() {
-                                        @Override
-                                        public Observable<ApplicationEntityGroup> call( final Id applicationId ) {
-
-                                            //set up our application scope and graph manager
-                                            final ApplicationScope applicationScope = new ApplicationScopeImpl(
-                                                    applicationId );
-
-
-                                            final GraphManager gm =
-                                                    managerCache.getGraphManager( applicationScope );
-
-
-                                            //load all nodes that are targets of our application node.  I.E.
-                                            // entities that have been saved
-                                            final Observable<Id> entityNodes =
-                                                    TargetIdObservable.getTargetNodes(gm, applicationId );
-
-                                            //create our application node to emit since it's an entity as well
-                                            final Observable<Id> applicationNode = Observable.just( applicationId );
-
-                                            //merge both the specified application node and the entity node
-                                            // so they all get used
-                                            return Observable.merge( applicationNode, entityNodes ).buffer(bufferSize)
-                                                             .map( new Func1<List<Id>, ApplicationEntityGroup>() {
-                                                                 @Override
-                                                                 public ApplicationEntityGroup call( final List<Id> id ) {
-                                                                     return new ApplicationEntityGroup( applicationScope, id );
-                                                                 }
-                                                             } );
-                                        }
-                                    } );
-    }
-
-
-    /**
-     * Get the entity data.  Immutable bean for fast access
-     */
-    public static final class ApplicationEntityGroup {
-        public final ApplicationScope applicationScope;
-        public final List<Id> entityIds;
-
-
-        public ApplicationEntityGroup( final ApplicationScope applicationScope, final List<Id> entityIds ) {
-            this.applicationScope = applicationScope;
-            this.entityIds = entityIds;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/ApplicationObservable.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/ApplicationObservable.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/ApplicationObservable.java
deleted file mode 100644
index 6019bca..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/ApplicationObservable.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.rx;
-
-
-import java.util.Arrays;
-import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.corepersistence.ManagerCache;
-import org.apache.usergrid.corepersistence.util.CpNamingUtils;
-import org.apache.usergrid.persistence.collection.CollectionScope;
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.graph.GraphManager;
-import org.apache.usergrid.persistence.graph.SearchByEdgeType;
-import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
-import org.apache.usergrid.persistence.model.entity.Entity;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import rx.Observable;
-import rx.functions.Func1;
-
-import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateApplicationId;
-import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getApplicationScope;
-
-
-/**
- * An observable that will emit all application stored in the system.
- */
-public class ApplicationObservable {
-
-    private static final Logger logger = LoggerFactory.getLogger( ApplicationObservable.class );
-
-    /**
-     * Get all applicationIds as an observable
-     */
-    public static Observable<Id> getAllApplicationIds( final ManagerCache managerCache ) {
-
-        //emit our 3 hard coded applications that are used the manage the system first.
-        //this way consumers can perform whatever work they need to on the root system first
-
-
-        final Observable<Id> systemIds = Observable.from( Arrays
-                .asList( generateApplicationId( CpNamingUtils.DEFAULT_APPLICATION_ID ),
-                        generateApplicationId( CpNamingUtils.MANAGEMENT_APPLICATION_ID ),
-                        generateApplicationId( CpNamingUtils.SYSTEM_APP_ID ) ) );
-
-
-        final ApplicationScope appScope = getApplicationScope( CpNamingUtils.SYSTEM_APP_ID );
-
-        final CollectionScope appInfoCollectionScope =
-                new CollectionScopeImpl( appScope.getApplication(), appScope.getApplication(),
-                        CpNamingUtils.getCollectionScopeNameFromCollectionName( CpNamingUtils.APPINFOS ) );
-
-        final EntityCollectionManager collectionManager =
-                managerCache.getEntityCollectionManager( appInfoCollectionScope );
-
-
-        final GraphManager gm = managerCache.getGraphManager( appScope );
-
-
-        String edgeType = CpNamingUtils.getEdgeTypeFromCollectionName( CpNamingUtils.APPINFOS );
-
-        Id rootAppId = appScope.getApplication();
-
-
-        //we have app infos.  For each of these app infos, we have to load the application itself
-        Observable<Id> appIds = gm.loadEdgesFromSource(
-                new SimpleSearchByEdgeType( rootAppId, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
-                        null ) ).flatMap( new Func1<Edge, Observable<Id>>() {
-            @Override
-            public Observable<Id> call( final Edge edge ) {
-                //get the app info and load it
-                final Id appInfo = edge.getTargetNode();
-
-                return collectionManager.load( appInfo )
-                        //filter out null entities
-                        .filter( new Func1<Entity, Boolean>() {
-                            @Override
-                            public Boolean call( final Entity entity ) {
-                                if ( entity == null ) {
-                                    logger.warn( "Encountered a null application info for id {}", appInfo );
-                                    return false;
-                                }
-
-                                return true;
-                            }
-                        } )
-                                //get the id from the entity
-                        .map( new Func1<org.apache.usergrid.persistence.model.entity.Entity, Id>() {
-
-
-                            @Override
-                            public Id call( final org.apache.usergrid.persistence.model.entity.Entity entity ) {
-
-                                final UUID uuid = ( UUID ) entity.getField( "applicationUuid" ).getValue();
-
-                                return CpNamingUtils.generateApplicationId( uuid );
-                            }
-                        } );
-            }
-        } );
-
-        return Observable.merge( systemIds, appIds );
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservable.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservable.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservable.java
deleted file mode 100644
index d3e2ee5..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservable.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.rx;
-
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.graph.GraphManager;
-import org.apache.usergrid.persistence.graph.SearchByEdgeType;
-import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
-import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import rx.Observable;
-import rx.functions.Func1;
-
-
-/**
- * Emits the edges that are edges from the specified source node
- */
-public class EdgesFromSourceObservable {
-
-    private static final Logger logger = LoggerFactory.getLogger( EdgesFromSourceObservable.class );
-
-
-    /**
-     * Get all edges from the source
-     */
-    public static Observable<Edge> edgesFromSource( final GraphManager gm, final Id sourceNode){
-        Observable<String> edgeTypes = gm.getEdgeTypesFromSource( new SimpleSearchEdgeType( sourceNode, null, null ) );
-
-        return edgeTypes.flatMap( new Func1<String, Observable<Edge>>() {
-            @Override
-            public Observable<Edge> call( final String edgeType ) {
-
-                logger.debug( "Loading edges of edgeType {} from {}", edgeType, sourceNode );
-
-                return gm.loadEdgesFromSource( new SimpleSearchByEdgeType( sourceNode, edgeType, Long.MAX_VALUE,
-                        SearchByEdgeType.Order.DESCENDING, null ) );
-            }
-        } );
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservable.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservable.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservable.java
deleted file mode 100644
index c5dc54d..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservable.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.rx;
-
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.graph.GraphManager;
-import org.apache.usergrid.persistence.graph.SearchByEdgeType;
-import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
-import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import rx.Observable;
-import rx.functions.Func1;
-
-
-/**
- * Emits the id of all edges where the given node is the target node
- */
-public class EdgesToTargetObservable {
-
-    private static final Logger logger = LoggerFactory.getLogger( EdgesToTargetObservable.class );
-
-
-    /**
-     * Get all edges from the source
-     */
-    public static Observable<Edge> getEdgesToTarget(final GraphManager gm,  final Id targetNode) {
-        Observable<String> edgeTypes = gm.getEdgeTypesToTarget( new SimpleSearchEdgeType( targetNode, null, null ) );
-
-        return edgeTypes.flatMap( new Func1<String, Observable<Edge>>() {
-            @Override
-            public Observable<Edge> call( final String edgeType ) {
-
-                logger.debug( "Loading edges of edgeType {} to {}", edgeType, targetNode);
-
-                return gm.loadEdgesToTarget( new SimpleSearchByEdgeType( targetNode, edgeType, Long.MAX_VALUE,
-                        SearchByEdgeType.Order.DESCENDING, null ) );
-            }
-        } );
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/TargetIdObservable.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/TargetIdObservable.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/TargetIdObservable.java
deleted file mode 100644
index c4b6526..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/TargetIdObservable.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.rx;
-
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.graph.GraphManager;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import rx.Observable;
-import rx.functions.Func1;
-
-
-/**
- * Emits the id of all nodes that are target nodes from the given source node
- */
-public class TargetIdObservable {
-
-    private static final Logger logger = LoggerFactory.getLogger( TargetIdObservable.class );
-
-
-    /**
-     * Get all nodes that are target nodes from the sourceNode
-     * @param gm
-     * @param sourceNode
-     *
-     * @return
-     */
-    public static Observable<Id> getTargetNodes( final GraphManager gm,  final Id sourceNode) {
-
-        //only search edge types that start with collections
-       return EdgesFromSourceObservable.edgesFromSource(gm, sourceNode ).map( new Func1<Edge, Id>() {
-
-
-           @Override
-           public Id call( final Edge edge ) {
-               final Id targetNode = edge.getTargetNode();
-
-               logger.debug( "Emitting targetId of {}", edge );
-
-
-               return targetNode;
-           }
-       } );
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntitiesInSystemObservableImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntitiesInSystemObservableImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntitiesInSystemObservableImpl.java
new file mode 100644
index 0000000..4608bd2
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntitiesInSystemObservableImpl.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.rx.impl;
+
+
+import java.util.List;
+
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
+import org.apache.usergrid.persistence.core.rx.ApplicationObservable;
+import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.graph.serialization.TargetIdObservable;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import rx.Observable;
+import rx.functions.Func1;
+
+
+/**
+ * An observable that will emit every entity Id stored in our entire system across all apps.
+ * Note that this only walks each application applicationId graph, and emits edges from the applicationId and it's edges as the s
+ * source node
+ */
+public class AllEntitiesInSystemObservableImpl implements AllEntitiesInSystemObservable {
+
+    private final ApplicationObservable applicationObservable;
+    private final GraphManagerFactory graphManagerFactory;
+    private final TargetIdObservable targetIdObservable;
+
+    public AllEntitiesInSystemObservableImpl(ApplicationObservable applicationObservable, GraphManagerFactory graphManagerFactory, TargetIdObservable targetIdObservable){
+
+        this.applicationObservable = applicationObservable;
+        this.graphManagerFactory = graphManagerFactory;
+        this.targetIdObservable = targetIdObservable;
+    }
+
+
+    public  Observable<ApplicationEntityGroup> getAllEntitiesInSystem(final int bufferSize) {
+        //traverse all nodes in the graph, load all source edges from them, then re-save the meta data
+        return applicationObservable.getAllApplicationIds( )
+
+                                    .flatMap( new Func1<Id, Observable<ApplicationEntityGroup>>() {
+                                        @Override
+                                        public Observable<ApplicationEntityGroup> call( final Id applicationId ) {
+
+                                            //set up our application scope and graph manager
+                                            final ApplicationScope applicationScope = new ApplicationScopeImpl(
+                                                    applicationId );
+
+                                            final GraphManager gm = graphManagerFactory.createEdgeManager(applicationScope);
+
+                                            //load all nodes that are targets of our application node.  I.E.
+                                            // entities that have been saved
+                                            final Observable<Id> entityNodes =
+                                                    targetIdObservable.getTargetNodes(gm, applicationId);
+
+
+                                            //get scope here
+
+
+                                            //emit Scope + ID
+
+                                            //create our application node to emit since it's an entity as well
+                                            final Observable<Id> applicationNode = Observable.just( applicationId );
+
+                                            //merge both the specified application node and the entity node
+                                            // so they all get used
+                                            return Observable
+                                                .merge(applicationNode, entityNodes)
+                                                    .buffer(bufferSize)
+                                                    .map(new Func1<List<Id>, ApplicationEntityGroup>() {
+                                                        @Override
+                                                        public ApplicationEntityGroup call(final List<Id> ids) {
+                                                            //CpNamingUtils.getCollectionScopeNameFromEntityType(applicationId,);
+
+                                                            return new ApplicationEntityGroup(applicationScope, ids);
+                                                        }
+                                                    });
+                                        }
+                                    } );
+    }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/ApplicationObservableImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/ApplicationObservableImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/ApplicationObservableImpl.java
new file mode 100644
index 0000000..10a043d
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/ApplicationObservableImpl.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.rx.impl;
+
+
+import java.util.Arrays;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import org.apache.usergrid.persistence.core.rx.ApplicationObservable;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import rx.Observable;
+import rx.functions.Func1;
+
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateApplicationId;
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getApplicationScope;
+
+
+/**
+ * An observable that will emit all application stored in the system.
+ */
+public class ApplicationObservableImpl implements ApplicationObservable {
+
+    private static final Logger logger = LoggerFactory.getLogger( ApplicationObservableImpl.class );
+    private final EntityCollectionManagerFactory entityCollectionManagerFactory;
+    private final GraphManagerFactory graphManagerFactory;
+
+    public ApplicationObservableImpl(EntityCollectionManagerFactory entityCollectionManagerFactory, GraphManagerFactory graphManagerFactory){
+
+        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+        this.graphManagerFactory = graphManagerFactory;
+    }
+
+
+    @Override
+    public Observable<Id> getAllApplicationIds() {
+
+        //emit our 3 hard coded applications that are used the manage the system first.
+        //this way consumers can perform whatever work they need to on the root system first
+        final Observable<Id> systemIds = Observable.from( Arrays
+                .asList( generateApplicationId( CpNamingUtils.DEFAULT_APPLICATION_ID ),
+                        generateApplicationId( CpNamingUtils.MANAGEMENT_APPLICATION_ID ),
+                        generateApplicationId( CpNamingUtils.SYSTEM_APP_ID ) ) );
+
+
+        final ApplicationScope appScope = getApplicationScope( CpNamingUtils.SYSTEM_APP_ID );
+
+        final CollectionScope appInfoCollectionScope =
+                new CollectionScopeImpl( appScope.getApplication(), appScope.getApplication(),
+                        CpNamingUtils.getCollectionScopeNameFromCollectionName( CpNamingUtils.APPINFOS ) );
+
+        final EntityCollectionManager collectionManager =
+                entityCollectionManagerFactory.createCollectionManager( appInfoCollectionScope );
+
+
+        final GraphManager gm = graphManagerFactory.createEdgeManager(appScope);
+
+
+        String edgeType = CpNamingUtils.getEdgeTypeFromCollectionName( CpNamingUtils.APPINFOS );
+
+        Id rootAppId = appScope.getApplication();
+
+
+        //we have app infos.  For each of these app infos, we have to load the application itself
+        Observable<Id> appIds = gm.loadEdgesFromSource(
+                new SimpleSearchByEdgeType( rootAppId, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
+                        null ) ).flatMap( new Func1<Edge, Observable<Id>>() {
+            @Override
+            public Observable<Id> call( final Edge edge ) {
+                //get the app info and load it
+                final Id appInfo = edge.getTargetNode();
+
+                return collectionManager.load( appInfo )
+                        //filter out null entities
+                        .filter( new Func1<Entity, Boolean>() {
+                            @Override
+                            public Boolean call( final Entity entity ) {
+                                if ( entity == null ) {
+                                    logger.warn( "Encountered a null application info for id {}", appInfo );
+                                    return false;
+                                }
+
+                                return true;
+                            }
+                        } )
+                                //get the id from the entity
+                        .map( new Func1<org.apache.usergrid.persistence.model.entity.Entity, Id>() {
+
+
+                            @Override
+                            public Id call( final org.apache.usergrid.persistence.model.entity.Entity entity ) {
+
+                                final UUID uuid = ( UUID ) entity.getField( "applicationUuid" ).getValue();
+
+                                return CpNamingUtils.generateApplicationId( uuid );
+                            }
+                        } );
+            }
+        } );
+
+        return Observable.merge( systemIds, appIds );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
index 0d7b6ff..154519b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
@@ -31,6 +31,7 @@ import org.apache.usergrid.persistence.map.MapScope;
 import org.apache.usergrid.persistence.map.impl.MapScopeImpl;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
+import rx.functions.Func1;
 
 
 /**
@@ -72,6 +73,7 @@ public class CpNamingUtils {
     public static String TYPES_BY_UUID_MAP = "zzz_typesbyuuid_zzz";
 
 
+
     /**
      * Generate a collection scope for a collection within the application's Id for the given type
      * @param applicationId The applicationId that owns this entity

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityDataMigrationIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityDataMigrationIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityDataMigrationIT.java
index b677769..2509771 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityDataMigrationIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityDataMigrationIT.java
@@ -24,10 +24,11 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Set;
 
-import org.apache.usergrid.persistence.core.guice.CurrentImpl;
-import org.apache.usergrid.persistence.core.guice.PreviousImpl;
-import org.apache.usergrid.persistence.core.guice.V1Impl;
-import org.apache.usergrid.persistence.core.guice.V2Impl;
+import org.apache.usergrid.persistence.collection.mvcc.MvccEntityMigrationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategyProxyV2Impl;
+import org.apache.usergrid.persistence.core.migration.data.DataMigration;
+import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
+import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -36,7 +37,7 @@ import org.apache.usergrid.AbstractCoreIT;
 import org.apache.usergrid.corepersistence.CpSetup;
 import org.apache.usergrid.corepersistence.EntityWriteHelper;
 import org.apache.usergrid.corepersistence.ManagerCache;
-import org.apache.usergrid.corepersistence.rx.AllEntitiesInSystemObservable;
+import org.apache.usergrid.corepersistence.rx.impl.AllEntitiesInSystemObservableImpl;
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.persistence.Entity;
 import org.apache.usergrid.persistence.EntityManager;
@@ -44,7 +45,7 @@ import org.apache.usergrid.persistence.EntityManagerFactory;
 import org.apache.usergrid.persistence.SimpleEntityRef;
 import org.apache.usergrid.persistence.collection.CollectionScope;
 import org.apache.usergrid.persistence.collection.MvccEntity;
-import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategy;
 import org.apache.usergrid.persistence.core.migration.data.DataMigrationManager;
 import org.apache.usergrid.persistence.core.migration.data.DataMigrationManagerImpl;
 import org.apache.usergrid.persistence.core.migration.data.MigrationInfoSerialization;
@@ -68,8 +69,7 @@ public class EntityDataMigrationIT extends AbstractCoreIT {
     private Injector injector;
 
 
-    private EntityDataMigration entityDataMigration;
-    private ManagerCache managerCache;
+    private DataMigration entityDataMigration;
     private DataMigrationManager dataMigrationManager;
     private MigrationInfoSerialization migrationInfoSerialization;
     private MvccEntitySerializationStrategy v1Strategy;
@@ -83,18 +83,20 @@ public class EntityDataMigrationIT extends AbstractCoreIT {
      */
     @Rule
     public MigrationTestRule migrationTestRule =
-            new MigrationTestRule( app, CpSetup.getInjector() ,EntityDataMigration.class  );
+            new MigrationTestRule( app, CpSetup.getInjector() ,MvccEntitySerializationStrategyProxyV2Impl.class  );
+    private AllEntitiesInSystemObservable allEntitiesInSystemObservable;
 
     @Before
     public void setup() {
         emf = setup.getEmf();
         injector = CpSetup.getInjector();
-        entityDataMigration = injector.getInstance( EntityDataMigration.class );
-        managerCache = injector.getInstance( ManagerCache.class );
+        entityDataMigration = injector.getInstance( MvccEntitySerializationStrategyProxyV2Impl.class );
         dataMigrationManager = injector.getInstance( DataMigrationManager.class );
         migrationInfoSerialization = injector.getInstance( MigrationInfoSerialization.class );
-        v1Strategy = injector.getInstance( Key.get(MvccEntitySerializationStrategy.class, PreviousImpl.class) );
-        v2Strategy = injector.getInstance( Key.get(MvccEntitySerializationStrategy.class, CurrentImpl.class) );
+        MvccEntityMigrationStrategy strategy = injector.getInstance(Key.get(MvccEntityMigrationStrategy.class));
+        allEntitiesInSystemObservable = injector.getInstance(AllEntitiesInSystemObservable.class);
+        v1Strategy = strategy.getMigration().from();
+        v2Strategy = strategy.getMigration().to();
     }
 
 
@@ -133,11 +135,11 @@ public class EntityDataMigrationIT extends AbstractCoreIT {
         //read everything in previous version format and put it into our types.  Assumes we're
         //using a test system, and it's not a huge amount of data, otherwise we'll overflow.
 
-        AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 )
-            .doOnNext( new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() {
+        allEntitiesInSystemObservable.getAllEntitiesInSystem(  1000)
+            .doOnNext( new Action1<ApplicationEntityGroup>() {
                 @Override
                 public void call(
-                        final AllEntitiesInSystemObservable.ApplicationEntityGroup entity ) {
+                        final ApplicationEntityGroup entity ) {
 
                     //add all versions from history to our comparison
                     for ( final Id id : entity.entityIds ) {
@@ -169,7 +171,18 @@ public class EntityDataMigrationIT extends AbstractCoreIT {
         assertTrue( "Saved new entities", savedEntities.size() > 0 );
 
         //perform the migration
-        entityDataMigration.migrate( progressObserver );
+        allEntitiesInSystemObservable.getAllEntitiesInSystem(  1000)
+            .doOnNext(new Action1<ApplicationEntityGroup>() {
+                @Override
+                public void call(ApplicationEntityGroup applicationEntityGroup) {
+                   try {
+                       entityDataMigration.migrate(applicationEntityGroup, progressObserver).toBlocking().last();
+                   }catch (Throwable e){
+                       throw new RuntimeException(e);
+                   }
+                }
+            }).toBlocking().last();
+
 
         assertFalse( "Progress observer should not have failed", progressObserver.getFailed() );
         assertTrue( "Progress observer should have update messages", progressObserver.getUpdates().size() > 0 );
@@ -185,12 +198,11 @@ public class EntityDataMigrationIT extends AbstractCoreIT {
 
 
         //now visit all entities in the system again, load them from v2, and ensure they're the same
-        AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 )
-            .doOnNext( new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() {
+        allEntitiesInSystemObservable.getAllEntitiesInSystem( 1000)
+            .doOnNext( new Action1<ApplicationEntityGroup>() {
                 @Override
                 public void call(
-                        final AllEntitiesInSystemObservable
-                                .ApplicationEntityGroup entity ) {
+                        final ApplicationEntityGroup entity ) {
                     //add all versions from history to our comparison
                     for ( final Id id : entity.entityIds ) {
 
@@ -222,12 +234,11 @@ public class EntityDataMigrationIT extends AbstractCoreIT {
 
         //now visit all entities in the system again, and load them from the EM,
         // ensure we see everything we did in the v1 traversal
-        AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 )
-            .doOnNext( new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() {
+        allEntitiesInSystemObservable.getAllEntitiesInSystem( 1000)
+            .doOnNext( new Action1<ApplicationEntityGroup>() {
                 @Override
                 public void call(
-                        final AllEntitiesInSystemObservable
-                                .ApplicationEntityGroup entity ) {
+                        final ApplicationEntityGroup entity ) {
 
                     final EntityManager em = emf.getEntityManager(
                             entity.applicationScope.getApplication().getUuid() );


[2/4] incubator-usergrid git commit: pushing migrations into core persistence

Posted by sf...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
index d0a87c3..10765f1 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
@@ -35,7 +35,7 @@ import org.junit.Assert;
 
 import org.apache.usergrid.persistence.collection.CollectionScope;
 import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
-import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategy;
 import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
 import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
 import org.apache.usergrid.persistence.collection.util.LogEntryMock;
@@ -61,7 +61,6 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
-import org.mockito.internal.util.collections.Sets;
 
 
 /**
@@ -79,7 +78,7 @@ public class EntityVersionCleanupTaskTest {
 
 
     @Test(timeout=10000)
-    public void noListenerOneVersion() 
+    public void noListenerOneVersion()
             throws ExecutionException, InterruptedException, ConnectionException {
 
 
@@ -109,7 +108,7 @@ public class EntityVersionCleanupTaskTest {
 
         final Id applicationId = new SimpleId( "application" );
 
-        final CollectionScope appScope = new CollectionScopeImpl( 
+        final CollectionScope appScope = new CollectionScopeImpl(
                 applicationId, applicationId, "users" );
 
         final Id entityId = new SimpleId( "user" );
@@ -148,10 +147,10 @@ public class EntityVersionCleanupTaskTest {
 
         final List<MvccEntity> mel = new ArrayList<MvccEntity>();
 
-        mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(), 
+        mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(),
                 MvccEntity.Status.DELETED, Optional.fromNullable((Entity)null)) );
 
-        mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(), 
+        mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(),
                 MvccEntity.Status.DELETED, Optional.fromNullable((Entity)null)) );
 
         when( ess.loadDescendingHistory(
@@ -175,7 +174,7 @@ public class EntityVersionCleanupTaskTest {
      * Tests the cleanup task on the first version created
      */
     @Test(timeout=10000)
-    public void noListenerNoVersions() 
+    public void noListenerNoVersions()
             throws ExecutionException, InterruptedException, ConnectionException {
 
 
@@ -208,14 +207,14 @@ public class EntityVersionCleanupTaskTest {
         final Id applicationId = new SimpleId( "application" );
 
 
-        final CollectionScope appScope = new CollectionScopeImpl( 
+        final CollectionScope appScope = new CollectionScopeImpl(
                 applicationId, applicationId, "users" );
 
         final Id entityId = new SimpleId( "user" );
 
 
         //mock up a single log entry for our first test
-        final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock( 
+        final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock(
                 mvccLogEntrySerializationStrategy, appScope, entityId, 1 );
 
 
@@ -249,10 +248,10 @@ public class EntityVersionCleanupTaskTest {
 
         final List<MvccEntity> mel = new ArrayList<MvccEntity>();
 
-        mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(), 
+        mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(),
                 MvccEntity.Status.DELETED, Optional.fromNullable((Entity)null)) );
 
-        mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(), 
+        mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(),
                 MvccEntity.Status.DELETED, Optional.fromNullable((Entity)null)) );
 
         when( ess.loadDescendingHistory( same( appScope ), same( entityId ), any(UUID.class), any(Integer.class) ) )
@@ -267,7 +266,7 @@ public class EntityVersionCleanupTaskTest {
 
 
         // These last two verify statements do not make sense. We cannot assert that the entity
-        // and log batches are never called. Even if there are no listeners the entity delete 
+        // and log batches are never called. Even if there are no listeners the entity delete
         // cleanup task will still run to do the normal cleanup.
         //
         // verify( entityBatch, never() ).execute();
@@ -276,7 +275,7 @@ public class EntityVersionCleanupTaskTest {
 
 
     @Test(timeout=10000)
-    public void singleListenerSingleVersion() 
+    public void singleListenerSingleVersion()
             throws ExecutionException, InterruptedException, ConnectionException {
 
 
@@ -317,14 +316,14 @@ public class EntityVersionCleanupTaskTest {
         final Id applicationId = new SimpleId( "application" );
 
 
-        final CollectionScope appScope = new CollectionScopeImpl( 
+        final CollectionScope appScope = new CollectionScopeImpl(
                 applicationId, applicationId, "users" );
 
         final Id entityId = new SimpleId( "user" );
 
 
         //mock up a single log entry for our first test
-        final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock( 
+        final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock(
                 mvccLogEntrySerializationStrategy, appScope, entityId, sizeToReturn + 1 );
 
 
@@ -361,10 +360,10 @@ public class EntityVersionCleanupTaskTest {
 
         final List<MvccEntity> mel = new ArrayList<MvccEntity>();
 
-        mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(), 
+        mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(),
                 MvccEntity.Status.DELETED, Optional.fromNullable((Entity)null)) );
 
-        mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(), 
+        mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(),
                 MvccEntity.Status.DELETED, Optional.fromNullable((Entity)null)) );
 
         when( ess.loadDescendingHistory( same( appScope ), same( entityId ), any(UUID.class), any(Integer.class) ) )
@@ -421,7 +420,7 @@ public class EntityVersionCleanupTaskTest {
         final int sizeToReturn = 10;
 
 
-        final CountDownLatch latch = new CountDownLatch( 
+        final CountDownLatch latch = new CountDownLatch(
                 sizeToReturn/serializationFig.getBufferSize() * 3 );
 
         final EntityVersionDeletedTest listener1 = new EntityVersionDeletedTest( latch );
@@ -436,13 +435,13 @@ public class EntityVersionCleanupTaskTest {
 
         final Id applicationId = new SimpleId( "application" );
 
-        final CollectionScope appScope = new CollectionScopeImpl( 
+        final CollectionScope appScope = new CollectionScopeImpl(
                 applicationId, applicationId, "users" );
 
         final Id entityId = new SimpleId( "user" );
 
         // mock up a single log entry for our first test
-        final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock( 
+        final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock(
                 mvccLogEntrySerializationStrategy, appScope, entityId, sizeToReturn + 1 );
 
         final UUID version = logEntryMock.getEntries().iterator().next().getVersion();
@@ -474,10 +473,10 @@ public class EntityVersionCleanupTaskTest {
 
         Entity entity = new Entity( entityId );
 
-        mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(), 
+        mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(),
                 MvccEntity.Status.DELETED, Optional.of(entity)) );
 
-        mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(), 
+        mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(),
                 MvccEntity.Status.DELETED, Optional.of(entity)) );
 
         when( ess.loadDescendingHistory( same( appScope ), same( entityId ), any(UUID.class), any(Integer.class) ) )
@@ -543,7 +542,7 @@ public class EntityVersionCleanupTaskTest {
 
         final int listenerCount = 5;
 
-        final CountDownLatch latch = new CountDownLatch( 
+        final CountDownLatch latch = new CountDownLatch(
                 sizeToReturn/serializationFig.getBufferSize() * listenerCount );
         final Semaphore waitSemaphore = new Semaphore( 0 );
 
@@ -565,14 +564,14 @@ public class EntityVersionCleanupTaskTest {
         final Id applicationId = new SimpleId( "application" );
 
 
-        final CollectionScope appScope = new CollectionScopeImpl( 
+        final CollectionScope appScope = new CollectionScopeImpl(
                 applicationId, applicationId, "users" );
 
         final Id entityId = new SimpleId( "user" );
 
 
         //mock up a single log entry for our first test
-        final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock( 
+        final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock(
                 mvccLogEntrySerializationStrategy, appScope, entityId, sizeToReturn + 1 );
 
 
@@ -768,7 +767,7 @@ public class EntityVersionCleanupTaskTest {
 
 
         @Override
-        public void versionDeleted( final CollectionScope scope, final Id entityId, 
+        public void versionDeleted( final CollectionScope scope, final Id entityId,
                 final List<MvccEntity> entityVersion ) {
             invocationLatch.countDown();
         }
@@ -786,7 +785,7 @@ public class EntityVersionCleanupTaskTest {
 
 
         @Override
-        public void versionDeleted( final CollectionScope scope, final Id entityId, 
+        public void versionDeleted( final CollectionScope scope, final Id entityId,
                 final List<MvccEntity> entityVersion ) {
 
             //wait for unblock to happen before counting down invocation latches

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java
index f5dc24c..07fd7c6 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java
@@ -5,7 +5,7 @@ import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
 import org.apache.usergrid.persistence.collection.CollectionScope;
-import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategy;
 import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
 import org.apache.usergrid.persistence.collection.MvccEntity;
 import org.apache.usergrid.persistence.collection.MvccLogEntry;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java
index 93cde8e..3db20b0 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java
@@ -22,7 +22,7 @@ import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
 import org.apache.usergrid.persistence.collection.CollectionScope;
-import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategy;
 import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
 import org.apache.usergrid.persistence.collection.MvccEntity;
 import org.apache.usergrid.persistence.collection.MvccLogEntry;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/EntityRepairImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/EntityRepairImplTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/EntityRepairImplTest.java
index 6194c80..8e48a69 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/EntityRepairImplTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/EntityRepairImplTest.java
@@ -28,7 +28,7 @@ import org.junit.Test;
 
 import org.apache.usergrid.persistence.collection.CollectionScope;
 import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl;
-import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategy;
 import org.apache.usergrid.persistence.collection.MvccEntity;
 import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccEntityImpl;
 import org.apache.usergrid.persistence.collection.serialization.impl.EntityRepairImpl;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImplTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImplTest.java
index 6ce88cf..32a4d13 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImplTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImplTest.java
@@ -33,13 +33,10 @@ import org.junit.Test;
 import org.apache.usergrid.persistence.collection.CollectionScope;
 import org.apache.usergrid.persistence.collection.MvccEntity;
 import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl;
-import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
 import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccEntityImpl;
-import org.apache.usergrid.persistence.collection.util.EntityHelper;
 import org.apache.usergrid.persistence.collection.util.EntityUtils;
 import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
 import org.apache.usergrid.persistence.core.guice.MigrationManagerRule;
-import org.apache.usergrid.persistence.core.util.ValidationUtils;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.entity.SimpleId;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyV1_2Test.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyV1_2Test.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyV1_2Test.java
index 9563258..cf98aab 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyV1_2Test.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyV1_2Test.java
@@ -20,12 +20,12 @@
 package org.apache.usergrid.persistence.collection.serialization.impl;
 
 
+import org.apache.usergrid.persistence.core.guice.V2Impl;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.runner.RunWith;
 
 import org.apache.usergrid.persistence.collection.guice.TestCollectionModule;
-import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
 import org.apache.usergrid.persistence.core.guice.ProxyImpl;
 import org.apache.usergrid.persistence.core.migration.data.MigrationInfoSerialization;
 import org.apache.usergrid.persistence.core.test.ITRunner;
@@ -67,7 +67,7 @@ public class MvccEntitySerializationStrategyProxyV1_2Test extends MvccEntitySeri
         existingVersion = migrationInfoSerialization.getVersion();
 
         //set our version to 0 so it uses both impls of the proxy
-        migrationInfoSerialization.setVersion( MvccEntitySerializationStrategyProxyV1Impl.MIGRATION_VERSION-1 );
+        migrationInfoSerialization.setVersion( V2Impl.MIGRATION_VERSION-1 );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyV1_3Test.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyV1_3Test.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyV1_3Test.java
index 67041f3..fb242f3 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyV1_3Test.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyV1_3Test.java
@@ -21,13 +21,12 @@ package org.apache.usergrid.persistence.collection.serialization.impl;
 
 
 import org.apache.usergrid.persistence.core.guice.V1ProxyImpl;
+import org.apache.usergrid.persistence.core.guice.V2Impl;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.runner.RunWith;
 
 import org.apache.usergrid.persistence.collection.guice.TestCollectionModule;
-import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
-import org.apache.usergrid.persistence.core.guice.ProxyImpl;
 import org.apache.usergrid.persistence.core.migration.data.MigrationInfoSerialization;
 import org.apache.usergrid.persistence.core.test.ITRunner;
 import org.apache.usergrid.persistence.core.test.UseModules;
@@ -67,7 +66,7 @@ public class MvccEntitySerializationStrategyProxyV1_3Test extends MvccEntitySeri
         existingVersion = migrationInfoSerialization.getVersion();
 
         //set our new version, so that is will run through the new code
-        migrationInfoSerialization.setVersion( MvccEntitySerializationStrategyProxyV1Impl.MIGRATION_VERSION );
+        migrationInfoSerialization.setVersion( V2Impl.MIGRATION_VERSION );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyV2Test.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyV2Test.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyV2Test.java
index 2e037a2..2524026 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyV2Test.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyV2Test.java
@@ -18,9 +18,8 @@ package org.apache.usergrid.persistence.collection.serialization.impl;
 
 import com.google.inject.Inject;
 import org.apache.usergrid.persistence.collection.guice.TestCollectionModule;
-import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
 import org.apache.usergrid.persistence.core.guice.ProxyImpl;
-import org.apache.usergrid.persistence.core.guice.V1ProxyImpl;
+import org.apache.usergrid.persistence.core.guice.V3Impl;
 import org.apache.usergrid.persistence.core.migration.data.MigrationInfoSerialization;
 import org.apache.usergrid.persistence.core.test.ITRunner;
 import org.apache.usergrid.persistence.core.test.UseModules;
@@ -63,7 +62,7 @@ public class MvccEntitySerializationStrategyProxyV2Test extends MvccEntitySerial
         existingVersion = migrationInfoSerialization.getVersion();
 
         //set our new version, so that is will run through the new code
-        migrationInfoSerialization.setVersion( MvccEntitySerializationStrategyProxyV2Impl.MIGRATION_VERSION );
+        migrationInfoSerialization.setVersion( V3Impl.MIGRATION_VERSION );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV1ImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV1ImplTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV1ImplTest.java
index c6f953e..8e7479a 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV1ImplTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV1ImplTest.java
@@ -20,13 +20,9 @@
 package org.apache.usergrid.persistence.collection.serialization.impl;
 
 
-import java.util.Iterator;
-
 import org.junit.runner.RunWith;
 
-import org.apache.usergrid.persistence.collection.MvccEntity;
 import org.apache.usergrid.persistence.collection.guice.TestCollectionModule;
-import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
 import org.apache.usergrid.persistence.core.guice.V1Impl;
 import org.apache.usergrid.persistence.core.test.ITRunner;
 import org.apache.usergrid.persistence.core.test.UseModules;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2ImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2ImplTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2ImplTest.java
index 39559af..faeb5f6 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2ImplTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2ImplTest.java
@@ -20,14 +20,9 @@
 package org.apache.usergrid.persistence.collection.serialization.impl;
 
 
-import java.util.Iterator;
-
 import org.junit.runner.RunWith;
 
-import org.apache.usergrid.persistence.collection.MvccEntity;
 import org.apache.usergrid.persistence.collection.guice.TestCollectionModule;
-import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
-import org.apache.usergrid.persistence.collection.util.EntityHelper;
 import org.apache.usergrid.persistence.core.guice.V2Impl;
 import org.apache.usergrid.persistence.core.test.ITRunner;
 import org.apache.usergrid.persistence.core.test.UseModules;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3ImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3ImplTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3ImplTest.java
index f5c27bc..8ffffc4 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3ImplTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3ImplTest.java
@@ -1,11 +1,7 @@
 package org.apache.usergrid.persistence.collection.serialization.impl;
 
 import com.google.inject.Inject;
-import com.netflix.astyanax.Keyspace;
 import org.apache.usergrid.persistence.collection.guice.TestCollectionModule;
-import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
-import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
-import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
 import org.apache.usergrid.persistence.core.guice.V3Impl;
 import org.apache.usergrid.persistence.core.test.ITRunner;
 import org.apache.usergrid.persistence.core.test.UseModules;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3Test.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3Test.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3Test.java
index 795ff9a..9a1400a 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3Test.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3Test.java
@@ -5,7 +5,6 @@ import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 import org.apache.usergrid.persistence.collection.CollectionScope;
 import org.apache.usergrid.persistence.collection.MvccEntity;
 import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl;
-import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
 import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccEntityImpl;
 import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
 import org.apache.usergrid.persistence.collection.util.EntityHelper;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CPManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CPManager.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CPManager.java
new file mode 100644
index 0000000..347080f
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CPManager.java
@@ -0,0 +1,28 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
+ *
+ */
+
+
+package org.apache.usergrid.persistence.core;
+/**
+ * Base Manager Class
+ */
+
+public interface CPManager {
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
index faa4e39..f2adee5 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
@@ -66,7 +66,6 @@ public class CommonModule extends AbstractModule {
         Multibinder<Migration> migrationBinding = Multibinder.newSetBinder( binder(), Migration.class );
         migrationBinding.addBinding().to( Key.get( MigrationInfoSerialization.class ) );
 
-
         bind( TimeService.class ).to( TimeServiceImpl.class );
 
         bind( CassandraConfig.class ).to( CassandraConfigImpl.class );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CurrentImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CurrentImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CurrentImpl.java
deleted file mode 100644
index cb52892..0000000
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CurrentImpl.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.usergrid.persistence.core.guice;
-
-import com.google.inject.BindingAnnotation;
-
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
-
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-
-/**
- * Classy class class.
- */
-@BindingAnnotation
-@Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
-public @interface CurrentImpl {
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/PreviousImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/PreviousImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/PreviousImpl.java
deleted file mode 100644
index 526eb3b..0000000
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/PreviousImpl.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.usergrid.persistence.core.guice;
-
-import com.google.inject.BindingAnnotation;
-
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
-
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-
-/**
- * Classy class class.
- */
-@BindingAnnotation
-@Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
-public @interface  PreviousImpl {
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/V2Impl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/V2Impl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/V2Impl.java
index dd01ff5..0ee87f0 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/V2Impl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/V2Impl.java
@@ -34,4 +34,6 @@ import static java.lang.annotation.RetentionPolicy.RUNTIME;
  */
 @BindingAnnotation
 @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
-public @interface V2Impl {}
\ No newline at end of file
+public @interface V2Impl {
+    public static final int MIGRATION_VERSION = 3;
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/V3Impl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/V3Impl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/V3Impl.java
index 3aedfff..aef3a9a 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/V3Impl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/V3Impl.java
@@ -34,4 +34,6 @@ import static java.lang.annotation.RetentionPolicy.RUNTIME;
  */
 @BindingAnnotation
 @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
-public @interface V3Impl {}
\ No newline at end of file
+public @interface V3Impl {
+    public static final int MIGRATION_VERSION = 4;
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigration.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigration.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigration.java
index 775df5d..f945375 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigration.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigration.java
@@ -19,6 +19,9 @@
 package org.apache.usergrid.persistence.core.migration.data;
 
 
+import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
+import rx.Observable;
+
 /**
  * An interface for updating data.  Has 2 basic functions. First it will perform the migration and update the status
  * object.
@@ -45,7 +48,7 @@ public interface DataMigration {
      * @param observer
      * @throws Throwable
      */
-    public void migrate(final ProgressObserver observer) throws Throwable;
+    public Observable migrate(final ApplicationEntityGroup applicationEntityGroup,final ProgressObserver observer) throws Throwable;
 
     /**
      * Get the version of this migration.  It must be unique.

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImpl.java
index a9719b7..5b8920e 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImpl.java
@@ -27,6 +27,8 @@ import java.util.TreeMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
+import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,6 +40,9 @@ import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
+import rx.Observable;
+import rx.functions.Action1;
+import rx.functions.Func1;
 
 
 @Singleton
@@ -48,13 +53,14 @@ public class DataMigrationManagerImpl implements DataMigrationManager {
     private final TreeMap<Integer, DataMigration> migrationTreeMap = new TreeMap<>();
 
     private final MigrationInfoSerialization migrationInfoSerialization;
+    private final AllEntitiesInSystemObservable allEntitiesInSystemObservable;
 
     /**
      * Cache to cache versions temporarily
      */
     private final LoadingCache<String, Integer> versionCache = CacheBuilder.newBuilder()
             //cache the local value for 1 minute
-            .expireAfterWrite( 1, TimeUnit.MINUTES ).build( new CacheLoader<String, Integer>() {
+            .expireAfterWrite(1, TimeUnit.MINUTES).build( new CacheLoader<String, Integer>() {
                 @Override
                 public Integer load( final String key ) throws Exception {
                     return migrationInfoSerialization.getVersion();
@@ -64,14 +70,17 @@ public class DataMigrationManagerImpl implements DataMigrationManager {
 
     @Inject
     public DataMigrationManagerImpl( final MigrationInfoSerialization migrationInfoSerialization,
-                                     final Set<DataMigration> migrations ) {
+                                     final Set<DataMigration> migrations,
+                                     final AllEntitiesInSystemObservable allEntitiesInSystemObservable
+    ) {
 
-        Preconditions.checkNotNull( migrationInfoSerialization, 
+        Preconditions.checkNotNull( migrationInfoSerialization,
                 "migrationInfoSerialization must not be null" );
         Preconditions.checkNotNull( migrations, "migrations must not be null" );
+        Preconditions.checkNotNull( allEntitiesInSystemObservable, "allentitiesobservable must not be null" );
 
         this.migrationInfoSerialization = migrationInfoSerialization;
-
+        this.allEntitiesInSystemObservable = allEntitiesInSystemObservable;
 
         for ( DataMigration migration : migrations ) {
 
@@ -89,8 +98,8 @@ public class DataMigrationManagerImpl implements DataMigrationManager {
                 final Class<? extends DataMigration> currentClass = migration.getClass();
 
 
-                throw new DataMigrationException( String.format( 
-                        "Data migrations must be unique.  Both classes %s and %s have version %d", 
+                throw new DataMigrationException( String.format(
+                        "Data migrations must be unique.  Both classes %s and %s have version %d",
                         existingClass, currentClass, version ) );
             }
 
@@ -114,49 +123,53 @@ public class DataMigrationManagerImpl implements DataMigrationManager {
                 migrationTreeMap.lastKey() );
 
         //we have our migrations to run, execute them
-        final NavigableMap<Integer, DataMigration> migrationsToRun = 
+        final NavigableMap<Integer, DataMigration> migrationsToRun =
                 migrationTreeMap.tailMap( currentVersion, false );
 
-        CassandraProgressObserver observer = new CassandraProgressObserver();
-
-
-        for ( DataMigration migration : migrationsToRun.values() ) {
-
-            migrationInfoSerialization.setStatusCode( StatusCode.RUNNING.status );
-
-            final int migrationVersion = migration.getVersion();
+        final CassandraProgressObserver observer = new CassandraProgressObserver();
 
-            LOG.info( "Running migration version {}", migrationVersion );
+        allEntitiesInSystemObservable.getAllEntitiesInSystem(1000)
+            .doOnNext(
+                new Action1<ApplicationEntityGroup>() {
+                    @Override
+                    public void call(
+                        final ApplicationEntityGroup applicationEntityGroup) {
+                        for (DataMigration migration : migrationsToRun.values()) {
 
-            observer.update( migrationVersion, "Starting migration" );
+                            migrationInfoSerialization.setStatusCode(StatusCode.RUNNING.status);
 
+                            final int migrationVersion = migration.getVersion();
 
-            //perform this migration, if it fails, short circuit
-            try {
-                migration.migrate( observer );
-            }
-            catch ( Throwable throwable ) {
-                observer.failed( migrationVersion, "Exception thrown during migration", throwable );
+                            LOG.info("Running migration version {}", migrationVersion);
 
-                LOG.error( "Unable to migrate to version {}.", migrationVersion, throwable );
+                            observer.update(migrationVersion, "Starting migration");
 
-                return;
-            }
+                            //perform this migration, if it fails, short circuit
+                            try {
+                                migration.migrate(applicationEntityGroup, observer).toBlocking().last();
+                            } catch (Throwable throwable) {
+                                observer.failed(migrationVersion, "Exception thrown during migration", throwable);
+                                LOG.error("Unable to migrate to version {}.", migrationVersion, throwable);
+                                return ;
+                            }
 
-            //we had an unhandled exception or the migration failed, short circuit
-            if ( observer.failed ) {
-                return;
-            }
+                            //we had an unhandled exception or the migration failed, short circuit
+                            if (observer.failed) {
+                                return ;
+                            }
 
-            //set the version
-            migrationInfoSerialization.setVersion( migrationVersion );
+                            //set the version
+                            migrationInfoSerialization.setVersion(migrationVersion);
 
-            versionCache.invalidateAll();
-
-            //update the observer for progress so other nodes can see it
-            observer.update( migrationVersion, "Completed successfully" );
-        }
+                            versionCache.invalidateAll();
 
+                            //update the observer for progress so other nodes can see it
+                            observer.update(migrationVersion, "Completed successfully");
+                        }
+                        return ;
+                    }
+                }).toBlocking().lastOrDefault(null);
+        ;
         migrationInfoSerialization.setStatusCode( StatusCode.COMPLETE.status );
     }
 
@@ -188,7 +201,7 @@ public class DataMigrationManagerImpl implements DataMigrationManager {
     public void resetToVersion( final int version ) {
         final int highestAllowed = migrationTreeMap.lastKey();
 
-        Preconditions.checkArgument( version <= highestAllowed, 
+        Preconditions.checkArgument( version <= highestAllowed,
                 "You cannot set a version higher than the max of " + highestAllowed);
         Preconditions.checkArgument( version >= 0, "You must specify a version of 0 or greater" );
 
@@ -225,7 +238,7 @@ public class DataMigrationManagerImpl implements DataMigrationManager {
         @Override
         public void failed( final int migrationVersion, final String reason ) {
 
-            final String storedMessage = String.format( 
+            final String storedMessage = String.format(
                     "Failed to migrate, reason is appended.  Error '%s'", reason );
 
 
@@ -245,13 +258,13 @@ public class DataMigrationManagerImpl implements DataMigrationManager {
             throwable.printStackTrace( new PrintWriter( stackTrace ) );
 
 
-            final String storedMessage = String.format( 
+            final String storedMessage = String.format(
                 "Failed to migrate, reason is appended.  Error '%s' %s", reason, stackTrace.toString() );
 
             update( migrationVersion, storedMessage );
 
 
-            LOG.error( "Unable to migrate version {} due to reason {}.", 
+            LOG.error( "Unable to migrate version {} due to reason {}.",
                     migrationVersion, reason, throwable );
 
             failed = true;
@@ -262,7 +275,7 @@ public class DataMigrationManagerImpl implements DataMigrationManager {
 
         @Override
         public void update( final int migrationVersion, final String message ) {
-            final String formattedOutput = String.format( 
+            final String formattedOutput = String.format(
                     "Migration version %d.  %s", migrationVersion, message );
 
             //Print this to the info log

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationStrategy.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationStrategy.java
new file mode 100644
index 0000000..758f54e
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationStrategy.java
@@ -0,0 +1,58 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.persistence.core.migration.schema;
+
+import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
+import org.apache.usergrid.persistence.core.migration.data.DataMigration;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.entity.Id;
+import rx.Observable;
+import rx.functions.Func1;
+
+
+/**
+ * Interface to encapsulate directional migrations
+ */
+public interface MigrationStrategy<T> extends DataMigration {
+    /**
+     * Returns the migration pattern to use
+     * @return
+     */
+    public MigrationRelationship<T> getMigration();
+
+    public class MigrationRelationship<T>  {
+        private final T from;
+        private final T to;
+
+        public MigrationRelationship(T from,T to){
+            this.from = from;
+            this.to = to;
+        }
+        public T from(){
+            return from;
+        }
+
+        public T to(){
+            return to;
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/AllEntitiesInSystemObservable.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/AllEntitiesInSystemObservable.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/AllEntitiesInSystemObservable.java
new file mode 100644
index 0000000..1ec017e
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/AllEntitiesInSystemObservable.java
@@ -0,0 +1,40 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.persistence.core.rx;
+
+import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
+import rx.Observable;
+
+
+/**
+ * An observable that will emit every entity Id stored in our entire system across all apps.
+ * Note that this only walks each application applicationId graph, and emits edges from the applicationId and it's edges as the s
+ * source node
+ */
+public interface AllEntitiesInSystemObservable {
+    /**
+     * Return an observable that emits all entities in the system.
+     *
+     * @param bufferSize The amount of entityIds to buffer into each ApplicationEntityGroup.  Note that if we exceed the buffer size
+     *                   you may be more than 1 ApplicationEntityGroup with the same application and different ids
+     */
+    public Observable<ApplicationEntityGroup> getAllEntitiesInSystem(final int bufferSize);
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ApplicationObservable.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ApplicationObservable.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ApplicationObservable.java
new file mode 100644
index 0000000..4b70462
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ApplicationObservable.java
@@ -0,0 +1,33 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.persistence.core.rx;
+
+import org.apache.usergrid.persistence.model.entity.Id;
+import rx.Observable;
+
+/**
+ * Classy class class.
+ */
+public interface ApplicationObservable {
+    /**
+     * Get all applicationIds as an observable
+     */
+    Observable<Id> getAllApplicationIds();
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/ApplicationEntityGroup.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/ApplicationEntityGroup.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/ApplicationEntityGroup.java
new file mode 100644
index 0000000..a767f4a
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/ApplicationEntityGroup.java
@@ -0,0 +1,40 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.persistence.core.scope;
+
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import java.util.List;
+
+/**
+ * Get the entity data.  Immutable bean for fast access
+ */
+public final class ApplicationEntityGroup {
+    public final ApplicationScope applicationScope;
+    public final List<Id> entityIds;
+
+    public ApplicationEntityGroup(final ApplicationScope applicationScope, final List<Id> entityIds) {
+        this.applicationScope = applicationScope;
+        this.entityIds = entityIds;
+    }
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/ApplicationScope.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/ApplicationScope.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/ApplicationScope.java
index 1f74acc..920421d 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/ApplicationScope.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/ApplicationScope.java
@@ -31,4 +31,5 @@ public interface ApplicationScope {
      * Get an Application scope
      */
     Id getApplication();
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/MaxMigrationVersion.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/MaxMigrationVersion.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/MaxMigrationVersion.java
index be45ab5..8184f09 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/MaxMigrationVersion.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/MaxMigrationVersion.java
@@ -21,15 +21,19 @@ package org.apache.usergrid.persistence.core.guice;
 
 
 import org.apache.usergrid.persistence.core.migration.data.DataMigration;
+import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
+import rx.Observable;
 
 
 /**
  * A simple migration that sets the version to max. This way our integration tests always test the latest code
  */
 public class MaxMigrationVersion implements DataMigration {
+
     @Override
-    public void migrate( final ProgressObserver observer ) throws Throwable {
+    public Observable migrate(final ApplicationEntityGroup applicationEntityGroup, final ProgressObserver observer ) throws Throwable {
          //no op, just needs to run to be set
+        return Observable.empty();
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImplTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImplTest.java
index 3ffcd89..8ce5d9f 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImplTest.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImplTest.java
@@ -25,11 +25,14 @@ package org.apache.usergrid.persistence.core.migration.data;
 import java.util.HashSet;
 import java.util.Set;
 
+import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
+import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 import org.apache.usergrid.persistence.core.migration.schema.MigrationException;
+import rx.Observable;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doAnswer;
@@ -46,16 +49,21 @@ import static org.mockito.Mockito.when;
  */
 public class DataMigrationManagerImplTest {
 
+    AllEntitiesInSystemObservable allEntitiesInSystemObservable = new AllEntitiesInSystemObservable() {
+        @Override
+        public Observable<ApplicationEntityGroup> getAllEntitiesInSystem(int bufferSize) {
+            
+            return null;
+        }
+    };
 
     @Test
     public void noMigrations() throws MigrationException {
         final MigrationInfoSerialization serialization = mock( MigrationInfoSerialization.class );
 
-
         Set<DataMigration> emptyMigration = new HashSet<>();
 
-
-        DataMigrationManagerImpl migrationManager = new DataMigrationManagerImpl( serialization, emptyMigration );
+        DataMigrationManagerImpl migrationManager = new DataMigrationManagerImpl( serialization, emptyMigration, allEntitiesInSystemObservable );
 
         migrationManager.migrate();
 
@@ -82,13 +90,13 @@ public class DataMigrationManagerImplTest {
         migrations.add( v2 );
 
 
-        DataMigrationManagerImpl migrationManager = new DataMigrationManagerImpl( serialization, migrations );
+        DataMigrationManagerImpl migrationManager = new DataMigrationManagerImpl( serialization, migrations,allEntitiesInSystemObservable );
 
         migrationManager.migrate();
 
 
-        verify( v1 ).migrate( any( DataMigration.ProgressObserver.class ) );
-        verify( v2 ).migrate( any( DataMigration.ProgressObserver.class ) );
+        verify( v1 ).migrate(any(ApplicationEntityGroup.class), any( DataMigration.ProgressObserver.class ) );
+        verify( v2 ).migrate(any(ApplicationEntityGroup.class), any( DataMigration.ProgressObserver.class ) );
 
         //verify we set the running status
         verify( serialization, times( 2 ) ).setStatusCode( DataMigrationManagerImpl.StatusCode.RUNNING.status );
@@ -115,8 +123,8 @@ public class DataMigrationManagerImplTest {
         when( v1.getVersion() ).thenReturn( 1 );
 
         //throw an exception
-        doThrow( new RuntimeException( "Something bad happened" ) ).when( v1 ).migrate(
-                any( DataMigration.ProgressObserver.class ) );
+        doThrow( new RuntimeException( "Something bad happened" ) ).when( v1 ).migrate(any(ApplicationEntityGroup.class),
+                any(DataMigration.ProgressObserver.class) );
 
         final DataMigration v2 = mock( DataMigration.class );
         when( v2.getVersion() ).thenReturn( 2 );
@@ -127,15 +135,15 @@ public class DataMigrationManagerImplTest {
         migrations.add( v2 );
 
 
-        DataMigrationManagerImpl migrationManager = new DataMigrationManagerImpl( serialization, migrations );
+        DataMigrationManagerImpl migrationManager = new DataMigrationManagerImpl( serialization, migrations,allEntitiesInSystemObservable );
 
         migrationManager.migrate();
 
 
-        verify( v1 ).migrate( any( DataMigration.ProgressObserver.class ) );
+        verify( v1 ).migrate( any(ApplicationEntityGroup.class),any( DataMigration.ProgressObserver.class ) );
 
         //verify we don't run migration
-        verify( v2, never() ).migrate( any( DataMigration.ProgressObserver.class ) );
+        verify( v2, never() ).migrate( any(ApplicationEntityGroup.class),any( DataMigration.ProgressObserver.class ) );
 
         //verify we set the running status
         verify( serialization, times( 1 ) ).setStatusCode( DataMigrationManagerImpl.StatusCode.RUNNING.status );
@@ -177,7 +185,7 @@ public class DataMigrationManagerImplTest {
                 progressObserver.failed( returnedCode, reason );
                 return null;
             }
-        } ).when( v1 ).migrate( any( DataMigration.ProgressObserver.class ) );
+        } ).when( v1 ).migrate(any(ApplicationEntityGroup.class), any( DataMigration.ProgressObserver.class ) );
 
 
 
@@ -190,15 +198,15 @@ public class DataMigrationManagerImplTest {
         migrations.add( v2 );
 
 
-        DataMigrationManagerImpl migrationManager = new DataMigrationManagerImpl( serialization, migrations );
+        DataMigrationManagerImpl migrationManager = new DataMigrationManagerImpl( serialization, migrations,allEntitiesInSystemObservable );
 
         migrationManager.migrate();
 
 
-        verify( v1 ).migrate( any( DataMigration.ProgressObserver.class ) );
+        verify( v1 ).migrate(any(ApplicationEntityGroup.class), any( DataMigration.ProgressObserver.class ) );
 
         //verify we don't run migration
-        verify( v2, never() ).migrate( any( DataMigration.ProgressObserver.class ) );
+        verify( v2, never() ).migrate( any(ApplicationEntityGroup.class),any( DataMigration.ProgressObserver.class ) );
 
         //verify we set the running status
         verify( serialization, times( 1 ) ).setStatusCode( DataMigrationManagerImpl.StatusCode.RUNNING.status );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java
index aa1a4a8..4c38c13 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java
@@ -20,6 +20,7 @@
 package org.apache.usergrid.persistence.graph;
 
 
+import org.apache.usergrid.persistence.core.CPManager;
 import org.apache.usergrid.persistence.model.entity.Id;
 
 import rx.Observable;
@@ -50,7 +51,7 @@ import rx.Observable;
  * @author tnine
  * @see Edge
  */
-public interface GraphManager {
+public interface GraphManager extends CPManager {
 
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManagerFactory.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManagerFactory.java
index c199312..eb49711 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManagerFactory.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManagerFactory.java
@@ -37,4 +37,6 @@ public interface GraphManagerFactory
      * @param collectionScope The context to use when creating the graph manager
      */
     public GraphManager createEdgeManager( ApplicationScope collectionScope );
+
+    void invalidate();
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
index 7041116..4786a42 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
@@ -21,13 +21,14 @@ package org.apache.usergrid.persistence.graph.guice;
 
 import org.apache.usergrid.persistence.core.guice.V1Impl;
 import org.apache.usergrid.persistence.core.guice.V2Impl;
+import org.apache.usergrid.persistence.core.migration.data.DataMigration;
+import org.apache.usergrid.persistence.graph.serialization.*;
+import org.apache.usergrid.persistence.graph.serialization.impl.*;
 import org.safehaus.guicyfig.GuicyFigModule;
 
-import org.apache.usergrid.persistence.core.astyanax.CassandraConfig;
 import org.apache.usergrid.persistence.core.consistency.TimeService;
 import org.apache.usergrid.persistence.core.consistency.TimeServiceImpl;
 import org.apache.usergrid.persistence.core.guice.ProxyImpl;
-import org.apache.usergrid.persistence.core.migration.data.DataMigrationManager;
 import org.apache.usergrid.persistence.core.migration.schema.Migration;
 import org.apache.usergrid.persistence.core.task.NamedTaskExecutorImpl;
 import org.apache.usergrid.persistence.core.task.TaskExecutor;
@@ -43,14 +44,6 @@ import org.apache.usergrid.persistence.graph.impl.stage.EdgeMetaRepair;
 import org.apache.usergrid.persistence.graph.impl.stage.EdgeMetaRepairImpl;
 import org.apache.usergrid.persistence.graph.impl.stage.NodeDeleteListener;
 import org.apache.usergrid.persistence.graph.impl.stage.NodeDeleteListenerImpl;
-import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
-import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
-import org.apache.usergrid.persistence.graph.serialization.NodeSerialization;
-import org.apache.usergrid.persistence.graph.serialization.impl.EdgeMetadataSerializationProxyImpl;
-import org.apache.usergrid.persistence.graph.serialization.impl.EdgeMetadataSerializationV1Impl;
-import org.apache.usergrid.persistence.graph.serialization.impl.EdgeMetadataSerializationV2Impl;
-import org.apache.usergrid.persistence.graph.serialization.impl.EdgeSerializationImpl;
-import org.apache.usergrid.persistence.graph.serialization.impl.NodeSerializationImpl;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeColumnFamilies;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardSerialization;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardStrategy;
@@ -77,7 +70,6 @@ import com.google.inject.Provides;
 import com.google.inject.Singleton;
 import com.google.inject.assistedinject.FactoryModuleBuilder;
 import com.google.inject.multibindings.Multibinder;
-import com.netflix.astyanax.Keyspace;
 
 
 public class GraphModule extends AbstractModule {
@@ -93,16 +85,21 @@ public class GraphModule extends AbstractModule {
 
         bind( TimeService.class ).to( TimeServiceImpl.class );
 
-        // create a guice factory for getting our collection manager
-        install( new FactoryModuleBuilder().implement( GraphManager.class, GraphManagerImpl.class )
-                                           .build( GraphManagerFactory.class ) );
+        bind( GraphManagerFactory.class ).to(GraphManagerFactoryImpl.class);
 
+        bind(GraphManager.class).to(GraphManagerImpl.class );
+
+        bind(EdgesFromSourceObservable.class).to(EdgesFromSourceObservableImpl.class);
+
+        bind(TargetIdObservable.class).to(TargetIdObservableImpl.class);
+
+        bind(EdgesToTargetObservable.class).to(EdgesToTargetObservableImpl.class);
 
         /**
          * bindings for shard allocations
          */
 
-        bind( NodeShardAllocation.class ).to( NodeShardAllocationImpl.class );
+        bind(NodeShardAllocation.class).to( NodeShardAllocationImpl.class );
         bind( NodeShardApproximation.class ).to( NodeShardApproximationImpl.class );
         bind( NodeShardCache.class ).to( NodeShardCacheImpl.class );
         bind( NodeShardCounterSerialization.class ).to( NodeShardCounterSerializationImpl.class );
@@ -118,6 +115,9 @@ public class GraphModule extends AbstractModule {
         bind( EdgeMetaRepair.class ).to( EdgeMetaRepairImpl.class );
         bind( EdgeDeleteRepair.class ).to( EdgeDeleteRepairImpl.class );
 
+        Multibinder<DataMigration> dataMigrationMultibinder =
+            Multibinder.newSetBinder( binder(), DataMigration.class );
+        dataMigrationMultibinder.addBinding().to( EdgeMetadataSerializationProxyImpl.class );
 
         /**
          * Add our listeners
@@ -167,6 +167,8 @@ public class GraphModule extends AbstractModule {
         bind(EdgeMetadataSerialization.class).annotatedWith( V1Impl.class ).to( EdgeMetadataSerializationV1Impl.class  );
         bind(EdgeMetadataSerialization.class).annotatedWith( V2Impl.class ).to( EdgeMetadataSerializationV2Impl.class  );
         bind(EdgeMetadataSerialization.class).annotatedWith( ProxyImpl.class ).to( EdgeMetadataSerializationProxyImpl.class  );
+        bind(EdgeMigrationStrategy.class).annotatedWith(ProxyImpl.class).to( EdgeMetadataSerializationProxyImpl.class  );
+
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeMigrationStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeMigrationStrategy.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeMigrationStrategy.java
new file mode 100644
index 0000000..ed46e43
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeMigrationStrategy.java
@@ -0,0 +1,33 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.persistence.graph.serialization;
+
+import org.apache.usergrid.persistence.core.migration.schema.MigrationStrategy;
+import org.apache.usergrid.persistence.graph.Edge;
+
+import java.util.List;
+
+/**
+ * Classy class class.
+ */
+public interface EdgeMigrationStrategy extends MigrationStrategy<EdgeMetadataSerialization> {
+    public static final int MIGRATION_VERSION = 2;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesFromSourceObservable.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesFromSourceObservable.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesFromSourceObservable.java
new file mode 100644
index 0000000..35ad41c
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesFromSourceObservable.java
@@ -0,0 +1,32 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.persistence.graph.serialization;
+
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.model.entity.Id;
+import rx.Observable;
+
+/**
+ * Classy class class.
+ */
+public interface EdgesFromSourceObservable {
+    Observable<Edge> edgesFromSource( final GraphManager gm, final Id sourceNode);
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesToTargetObservable.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesToTargetObservable.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesToTargetObservable.java
new file mode 100644
index 0000000..cfec5bd
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesToTargetObservable.java
@@ -0,0 +1,32 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.persistence.graph.serialization;
+
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.model.entity.Id;
+import rx.Observable;
+
+/**
+ * Classy class class.
+ */
+public interface EdgesToTargetObservable {
+    Observable<Edge> getEdgesToTarget(final GraphManager gm,  final Id targetNode);
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/TargetIdObservable.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/TargetIdObservable.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/TargetIdObservable.java
new file mode 100644
index 0000000..31164df
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/TargetIdObservable.java
@@ -0,0 +1,38 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.persistence.graph.serialization;
+
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.model.entity.Id;
+import rx.Observable;
+
+/**
+ * Emits the id of all nodes that are target nodes from the given source node
+ */
+public interface TargetIdObservable {
+    /**
+     * Get all nodes that are target nodes from the sourceNode
+     * @param gm
+     * @param sourceNode
+     *
+     * @return
+     */
+    Observable<Id> getTargetNodes(GraphManager gm, Id sourceNode);
+}


[3/4] incubator-usergrid git commit: pushing migrations into core persistence

Posted by sf...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java
index 266fb17..3da0b85 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java
@@ -23,6 +23,8 @@ package org.apache.usergrid.corepersistence.migration;
 import java.util.HashSet;
 import java.util.Set;
 
+import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
+import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -31,7 +33,7 @@ import org.apache.usergrid.AbstractCoreIT;
 import org.apache.usergrid.corepersistence.CpSetup;
 import org.apache.usergrid.corepersistence.EntityWriteHelper;
 import org.apache.usergrid.corepersistence.ManagerCache;
-import org.apache.usergrid.corepersistence.rx.AllEntitiesInSystemObservable;
+import org.apache.usergrid.corepersistence.rx.impl.AllEntitiesInSystemObservableImpl;
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.persistence.Entity;
 import org.apache.usergrid.persistence.EntityManager;
@@ -69,9 +71,9 @@ public class EntityTypeMappingMigrationIT extends AbstractCoreIT {
      * Rule to do the resets we need
      */
     @Rule
-    public MigrationTestRule migrationTestRule = new MigrationTestRule( 
+    public MigrationTestRule migrationTestRule = new MigrationTestRule(
             app, CpSetup.getInjector() ,EntityTypeMappingMigration.class  );
-
+    private AllEntitiesInSystemObservable allEntitiesInSystemObservable;
 
 
     @Before
@@ -82,6 +84,7 @@ public class EntityTypeMappingMigrationIT extends AbstractCoreIT {
         keyspace = injector.getInstance( Keyspace.class );
         managerCache = injector.getInstance( ManagerCache.class );
         dataMigrationManager = injector.getInstance( DataMigrationManager.class );
+        allEntitiesInSystemObservable = injector.getInstance(AllEntitiesInSystemObservable.class);
     }
 
 
@@ -114,62 +117,69 @@ public class EntityTypeMappingMigrationIT extends AbstractCoreIT {
         keyspace.truncateColumnFamily( MapSerializationImpl.MAP_ENTRIES );
         keyspace.truncateColumnFamily( MapSerializationImpl.MAP_KEYS );
 
-        app.createApplication( 
-                GraphShardVersionMigrationIT.class.getSimpleName()+ UUIDGenerator.newTimeUUID(), 
+        app.createApplication(
+                GraphShardVersionMigrationIT.class.getSimpleName()+ UUIDGenerator.newTimeUUID(),
                 "migrationTest" );
 
 
 
         final TestProgressObserver progressObserver = new TestProgressObserver();
 
-        entityTypeMappingMigration.migrate( progressObserver );
-
+        allEntitiesInSystemObservable.getAllEntitiesInSystem(  1000)
+            .doOnNext(new Action1<ApplicationEntityGroup>() {
+                @Override
+                public void call(final ApplicationEntityGroup entity) {
+                    try {
+                        entityTypeMappingMigration.migrate(entity, progressObserver).toBlocking().last();
+                    }catch (Throwable e ){
+                        throw new RuntimeException(e);
+                    }
+                }
+            });
 
-        AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 )
-            .doOnNext( new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() {
+        allEntitiesInSystemObservable.getAllEntitiesInSystem(1000)
+            .doOnNext(new Action1<ApplicationEntityGroup>() {
                 @Override
                 public void call(
-                        final AllEntitiesInSystemObservable.ApplicationEntityGroup entity ) {
+                    final ApplicationEntityGroup entity) {
                     //ensure that each one has a type
 
                     final EntityManager em = emf.getEntityManager(
-                            entity.applicationScope.getApplication().getUuid() );
+                        entity.applicationScope.getApplication().getUuid());
 
-                    for ( final Id id : entity.entityIds ) {
+                    for (final Id id : entity.entityIds) {
                         try {
-                            final Entity returned = em.get( id.getUuid() );
+                            final Entity returned = em.get(id.getUuid());
 
                             //we seem to occasionally get phantom edges.  If this is the
                             // case we'll store the type _> uuid mapping, but we won't have
                             // anything to load
 
-                            if ( returned != null ) {
-                                assertEquals( id.getUuid(), returned.getUuid() );
-                                assertEquals( id.getType(), returned.getType() );
-                            }
-                            else {
-                                final String type = managerCache.getMapManager( CpNamingUtils
-                                        .getEntityTypeMapScope(
-                                                entity.applicationScope.getApplication() ) )
-                                                                .getString( id.getUuid()
-                                                                            .toString() );
-
-                                assertEquals( id.getType(), type );
+                            if (returned != null) {
+                                assertEquals(id.getUuid(), returned.getUuid());
+                                assertEquals(id.getType(), returned.getType());
+                            } else {
+                                final String type = managerCache.getMapManager(CpNamingUtils
+                                    .getEntityTypeMapScope(
+                                        entity.applicationScope.getApplication()))
+                                    .getString(id.getUuid()
+                                        .toString());
+
+                                assertEquals(id.getType(), type);
                             }
-                        }
-                        catch ( Exception e ) {
-                            throw new RuntimeException( "Unable to get entity " + id
-                                    + " by UUID, migration failed", e );
-                        }
+                        } catch (Exception e) {
+                            throw new RuntimeException("Unable to get entity " + id
+                                + " by UUID, migration failed", e);
+                                    }
 
-                        allEntities.remove( id );
-                    }
-                }
-            } ).toBlocking().lastOrDefault( null );
+                                    allEntities.remove(id);
+                                }
+                            }
+                        }).toBlocking().lastOrDefault(null);
 
 
-        assertEquals( "Every element should have been encountered", 0, allEntities.size() );
-        assertFalse( "Progress observer should not have failed", progressObserver.getFailed() );
-        assertTrue( "Progress observer should have update messages", progressObserver.getUpdates().size() > 0 );
-    }
-}
+                    assertEquals("Every element should have been encountered", 0, allEntities.size());
+                    assertFalse("Progress observer should not have failed", progressObserver.getFailed());
+                    assertTrue("Progress observer should have update messages", progressObserver.getUpdates().size() > 0);
+                }
+            }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java
index f287047..2d6d0d9 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java
@@ -23,6 +23,10 @@ package org.apache.usergrid.corepersistence.migration;
 import java.util.HashSet;
 import java.util.Set;
 
+import org.apache.usergrid.persistence.core.migration.data.DataMigration;
+import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
+import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
+import org.apache.usergrid.persistence.graph.serialization.impl.EdgeMetadataSerializationProxyImpl;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -31,7 +35,7 @@ import org.apache.usergrid.AbstractCoreIT;
 import org.apache.usergrid.corepersistence.CpSetup;
 import org.apache.usergrid.corepersistence.EntityWriteHelper;
 import org.apache.usergrid.corepersistence.ManagerCache;
-import org.apache.usergrid.corepersistence.rx.AllEntitiesInSystemObservable;
+import org.apache.usergrid.corepersistence.rx.impl.AllEntitiesInSystemObservableImpl;
 import org.apache.usergrid.persistence.EntityManager;
 import org.apache.usergrid.persistence.core.migration.data.DataMigrationManager;
 import org.apache.usergrid.persistence.core.migration.data.DataMigrationManagerImpl;
@@ -55,7 +59,7 @@ import org.junit.Ignore;
 public class GraphShardVersionMigrationIT extends AbstractCoreIT {
 
     private Injector injector;
-    private GraphShardVersionMigration graphShardVersionMigration;
+    private DataMigration graphShardVersionMigration;
     private ManagerCache managerCache;
     private DataMigrationManager dataMigrationManager;
     private MigrationInfoSerialization migrationInfoSerialization;
@@ -65,18 +69,18 @@ public class GraphShardVersionMigrationIT extends AbstractCoreIT {
      * Rule to do the resets we need
      */
     @Rule
-    public MigrationTestRule migrationTestRule = new MigrationTestRule( app, CpSetup.getInjector() ,GraphShardVersionMigration.class  );
-
+    public MigrationTestRule migrationTestRule = new MigrationTestRule( app, CpSetup.getInjector() ,EdgeMetadataSerializationProxyImpl.class  );
+    private AllEntitiesInSystemObservable allEntitiesInSystemObservable;
 
 
     @Before
     public void setup() {
         injector = CpSetup.getInjector();
-        graphShardVersionMigration = injector.getInstance( GraphShardVersionMigration.class );
+        graphShardVersionMigration = injector.getInstance( EdgeMetadataSerializationProxyImpl.class );
         managerCache = injector.getInstance( ManagerCache.class );
         dataMigrationManager = injector.getInstance( DataMigrationManager.class );
         migrationInfoSerialization = injector.getInstance( MigrationInfoSerialization.class );
-
+        allEntitiesInSystemObservable = injector.getInstance(AllEntitiesInSystemObservable.class);
     }
 
 
@@ -114,11 +118,11 @@ public class GraphShardVersionMigrationIT extends AbstractCoreIT {
 
         //read everything in previous version format and put it into our types.
 
-        AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 )
-                                     .doOnNext( new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() {
+        allEntitiesInSystemObservable.getAllEntitiesInSystem( 1000)
+                                     .doOnNext( new Action1<ApplicationEntityGroup>() {
                                          @Override
                                          public void call(
-                                                 final AllEntitiesInSystemObservable.ApplicationEntityGroup entity ) {
+                                                 final ApplicationEntityGroup entity ) {
 
                                              final GraphManager gm =
                                                      managerCache.getGraphManager( entity.applicationScope );
@@ -154,7 +158,18 @@ public class GraphShardVersionMigrationIT extends AbstractCoreIT {
 
 
         //perform the migration
-        graphShardVersionMigration.migrate( progressObserver );
+        allEntitiesInSystemObservable.getAllEntitiesInSystem( 1000)
+            .doOnNext( new Action1<ApplicationEntityGroup>() {
+                @Override
+                public void call(
+                    final ApplicationEntityGroup entity) {
+                    try {
+                        graphShardVersionMigration.migrate(entity, progressObserver).toBlocking().last();
+                    }catch (Throwable e){
+                        throw new RuntimeException(e);
+                    }
+                }
+            }).toBlocking().last();
 
         assertEquals( "Newly saved entities encounterd", 0, allEntities.size() );
         assertFalse( "Progress observer should not have failed", progressObserver.getFailed() );
@@ -171,12 +186,11 @@ public class GraphShardVersionMigrationIT extends AbstractCoreIT {
 
 
         //now visit all nodes in the system and remove their types from the multi maps, it should be empty at the end
-        AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 )
-                                     .doOnNext( new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() {
+        allEntitiesInSystemObservable.getAllEntitiesInSystem( 1000)
+                                     .doOnNext( new Action1<ApplicationEntityGroup>() {
                                                     @Override
                                                     public void call(
-                                                            final AllEntitiesInSystemObservable
-                                                                    .ApplicationEntityGroup entity ) {
+                                                            final ApplicationEntityGroup entity ) {
 
                                                         final GraphManager gm =
                                                                 managerCache.getGraphManager( entity.applicationScope );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservableIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservableIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservableIT.java
index 4d1c6c9..30c9ac3 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservableIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservableIT.java
@@ -23,6 +23,11 @@ package org.apache.usergrid.corepersistence.rx;
 import java.util.HashSet;
 import java.util.Set;
 
+import org.apache.usergrid.corepersistence.rx.impl.AllEntitiesInSystemObservableImpl;
+import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
+import org.apache.usergrid.persistence.core.rx.ApplicationObservable;
+import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
+import org.apache.usergrid.persistence.graph.serialization.TargetIdObservable;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -55,6 +60,9 @@ public class AllEntitiesInSystemObservableIT extends AbstractCoreIT {
     @Test
     public void testEntities() throws Exception {
 
+        AllEntitiesInSystemObservable allEntitiesInSystemObservableImpl = CpSetup.getInjector().getInstance(AllEntitiesInSystemObservable.class);
+        TargetIdObservable targetIdObservable = CpSetup.getInjector().getInstance(TargetIdObservable.class);
+
         final EntityManager em = app.getEntityManager();
 
         final String type1 = "type1thing";
@@ -92,9 +100,9 @@ public class AllEntitiesInSystemObservableIT extends AbstractCoreIT {
 
         final GraphManager gm = managerCache.getGraphManager( scope );
 
-        AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 ).doOnNext( new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() {
+        allEntitiesInSystemObservableImpl.getAllEntitiesInSystem( 1000).doOnNext( new Action1<ApplicationEntityGroup>() {
             @Override
-            public void call( final AllEntitiesInSystemObservable.ApplicationEntityGroup entity ) {
+            public void call( final ApplicationEntityGroup entity ) {
 
                 assertNotNull(entity);
                 assertNotNull(entity.applicationScope);
@@ -125,7 +133,7 @@ public class AllEntitiesInSystemObservableIT extends AbstractCoreIT {
 
         //test connections
 
-        TargetIdObservable.getTargetNodes( gm, source ).doOnNext( new Action1<Id>() {
+        targetIdObservable.getTargetNodes( gm, source ).doOnNext( new Action1<Id>() {
             @Override
             public void call( final Id target ) {
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/ApplicationObservableTestIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/ApplicationObservableTestIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/ApplicationObservableTestIT.java
index f8f3c50..7c902ea 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/ApplicationObservableTestIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/ApplicationObservableTestIT.java
@@ -24,21 +24,19 @@ import java.util.HashSet;
 import java.util.Set;
 import java.util.UUID;
 
+import org.apache.usergrid.persistence.core.rx.ApplicationObservable;
 import org.junit.Test;
 
 import org.apache.usergrid.AbstractCoreIT;
 import org.apache.usergrid.corepersistence.CpSetup;
 import org.apache.usergrid.corepersistence.ManagerCache;
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
-import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
 import org.apache.usergrid.persistence.entities.Application;
-import org.apache.usergrid.persistence.graph.GraphManagerFactory;
 import org.apache.usergrid.persistence.model.entity.Id;
 
 import rx.Observable;
 import rx.functions.Action1;
 
-import static junit.framework.Assert.assertNotNull;
 import static org.junit.Assert.assertEquals;
 
 
@@ -52,6 +50,7 @@ public class ApplicationObservableTestIT extends AbstractCoreIT {
 
         final Application createdApplication = app.getEntityManager().getApplication();
 
+        ApplicationObservable applicationObservable = CpSetup.getInjector().getInstance(ApplicationObservable.class);
 
         //now our get all apps we expect.  There may be more, but we don't care about those.
         final Set<UUID> applicationIds = new HashSet<UUID>() {{
@@ -66,7 +65,7 @@ public class ApplicationObservableTestIT extends AbstractCoreIT {
         //clean up our wiring
         ManagerCache managerCache = CpSetup.getInjector().getInstance( ManagerCache.class );
 
-        Observable<Id> appObservable = ApplicationObservable.getAllApplicationIds( managerCache );
+        Observable<Id> appObservable = applicationObservable.getAllApplicationIds();
 
         appObservable.doOnNext( new Action1<Id>() {
             @Override

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java
index 2aa7fbc..2564fe5 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java
@@ -25,6 +25,7 @@ import java.util.HashSet;
 import java.util.Set;
 import java.util.UUID;
 
+import org.apache.usergrid.persistence.graph.serialization.EdgesToTargetObservable;
 import org.junit.Test;
 
 import org.apache.usergrid.AbstractCoreIT;
@@ -60,6 +61,7 @@ public class EdgesFromSourceObservableIT extends AbstractCoreIT {
     @Test
     public void testEntities() throws Exception {
 
+        EdgesToTargetObservable edgesToTargetObservable = CpSetup.getInjector().getInstance(EdgesToTargetObservable.class);
         final EntityManager em = app.getEntityManager();
         final Application createdApplication = em.getApplication();
 
@@ -96,7 +98,7 @@ public class EdgesFromSourceObservableIT extends AbstractCoreIT {
 
         final GraphManager gm = managerCache.getGraphManager( scope );
 
-        EdgesToTargetObservable.getEdgesToTarget( gm, target ).doOnNext( new Action1<Edge>() {
+        edgesToTargetObservable.getEdgesToTarget( gm, target ).doOnNext( new Action1<Edge>() {
             @Override
             public void call( final Edge edge ) {
                 final String edgeType = edge.getType();

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
index ef0d953..5d25f62 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
@@ -25,6 +25,7 @@ import java.util.HashSet;
 import java.util.Set;
 import java.util.UUID;
 
+import org.apache.usergrid.persistence.graph.serialization.EdgesFromSourceObservable;
 import org.junit.Test;
 
 import org.apache.usergrid.AbstractCoreIT;
@@ -59,6 +60,7 @@ public class EdgesToTargetObservableIT extends AbstractCoreIT {
     @Test
     public void testEntities() throws Exception {
 
+        EdgesFromSourceObservable edgesFromSourceObservable=  CpSetup.getInjector().getInstance(EdgesFromSourceObservable.class);
         final EntityManager em = app.getEntityManager();
 
         final String type1 = "type1things";
@@ -92,7 +94,7 @@ public class EdgesToTargetObservableIT extends AbstractCoreIT {
 
         final GraphManager gm = managerCache.getGraphManager( scope );
 
-        EdgesFromSourceObservable.edgesFromSource( gm, applicationId ).doOnNext( new Action1<Edge>() {
+        edgesFromSourceObservable.edgesFromSource( gm, applicationId ).doOnNext( new Action1<Edge>() {
             @Override
             public void call( final Edge edge ) {
                 final String edgeType = edge.getType();
@@ -124,7 +126,7 @@ public class EdgesToTargetObservableIT extends AbstractCoreIT {
 
         //test connections
 
-        EdgesFromSourceObservable.edgesFromSource( gm, source).doOnNext( new Action1<Edge>() {
+        edgesFromSourceObservable.edgesFromSource( gm, source).doOnNext( new Action1<Edge>() {
             @Override
             public void call( final Edge edge ) {
                 final String edgeType = edge.getType();

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/TargetIdObservableTestIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/TargetIdObservableTestIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/TargetIdObservableTestIT.java
index cde8866..e5b0319 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/TargetIdObservableTestIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/TargetIdObservableTestIT.java
@@ -25,6 +25,7 @@ import java.util.HashSet;
 import java.util.Set;
 import java.util.UUID;
 
+import org.apache.usergrid.persistence.graph.serialization.TargetIdObservable;
 import org.junit.Test;
 
 import org.apache.usergrid.AbstractCoreIT;
@@ -59,6 +60,8 @@ public class TargetIdObservableTestIT extends AbstractCoreIT {
     @Test
     public void testEntities() throws Exception {
 
+        TargetIdObservable targetIdObservable = CpSetup.getInjector().getInstance(TargetIdObservable.class);
+
         final EntityManager em = app.getEntityManager();
 
 
@@ -93,7 +96,7 @@ public class TargetIdObservableTestIT extends AbstractCoreIT {
 
         final GraphManager gm = managerCache.getGraphManager( scope );
 
-        TargetIdObservable.getTargetNodes( gm, applicationId ).doOnNext( new Action1<Id>() {
+        targetIdObservable.getTargetNodes( gm, applicationId ).doOnNext( new Action1<Id>() {
             @Override
             public void call( final Id target ) {
 
@@ -116,7 +119,7 @@ public class TargetIdObservableTestIT extends AbstractCoreIT {
 
         //test connections
 
-        TargetIdObservable.getTargetNodes( gm, source ).doOnNext( new Action1<Id>() {
+        targetIdObservable.getTargetNodes( gm, source ).doOnNext( new Action1<Id>() {
             @Override
             public void call( final Id target ) {
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
index 4de18fe..90cade0 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
@@ -20,6 +20,8 @@ package org.apache.usergrid.persistence.collection;
 
 
 import java.util.Collection;
+
+import org.apache.usergrid.persistence.core.CPManager;
 import org.apache.usergrid.persistence.core.util.Health;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
@@ -30,7 +32,7 @@ import rx.Observable;
 /**
  * The operations for performing changes on an entity
  */
-public interface EntityCollectionManager {
+public interface EntityCollectionManager extends CPManager {
 
     /**
      * Write the entity in the entity collection.  This is an entire entity, it's contents will
@@ -68,12 +70,12 @@ public interface EntityCollectionManager {
 
     /**
      * Takes the change and reloads an entity with all changes applied in this entity applied.
-     * The resulting entity from calling load will be the previous version of this entity plus 
+     * The resulting entity from calling load will be the previous version of this entity plus
      * the entity in this object applied to it.
      */
     public Observable<Entity> update ( Entity entity );
 
-    /** 
+    /**
      * Returns health of entity data store.
      */
     public Health getHealth();

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerFactory.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerFactory.java
index ef579f8..9140913 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerFactory.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerFactory.java
@@ -20,40 +20,42 @@ package org.apache.usergrid.persistence.collection;
 
 
 /**
- * A basic factory that creates a collection manager with the given context. 
+ * A basic factory that creates a collection manager with the given context.
  * Each instance of this factory should exist for a Single ApplicationScope
  */
 public interface EntityCollectionManagerFactory {
 
     /**
-     * Create a new EntityCollectionManager for the given context. 
-     * The EntityCollectionManager can safely be used on the current thread 
+     * Create a new EntityCollectionManager for the given context.
+     * The EntityCollectionManager can safely be used on the current thread
      * and will shard responses.  The returned instance should not be shared
      * among threads it will not be guaranteed to be thread safe.
      *
-     * @param collectionScope The collectionScope collectionScope to use 
+     * @param collectionScope The collectionScope collectionScope to use
      * when creating the collectionScope manager
      *
      * @return The collectionScope manager to perform operations within the provided context
      */
-    public EntityCollectionManager 
+    public EntityCollectionManager
         createCollectionManager( CollectionScope collectionScope );
 
 
 
     /**
-     * Create a new EntityCollectionManagerSync for the given context. 
-     * The EntityCollectionManager can safely be used on the current thread 
+     * Create a new EntityCollectionManagerSync for the given context.
+     * The EntityCollectionManager can safely be used on the current thread
      * and will shard responses.  The returned instance should not be shared
-     * among threads it will not be guaranteed to be thread safe.  
+     * among threads it will not be guaranteed to be thread safe.
      * This implementation will be synchronous. Try to use the org.apache.usergrid.persistence.core.consistency
      * implementation if possible
      *
-     * @param collectionScope The collectionScope collectionScope to use when 
+     * @param collectionScope The collectionScope collectionScope to use when
      * creating the collectionScope manager
      *
      * @return The collectionScope manager to perform operations within the provided context
      */
-    public EntityCollectionManagerSync 
+    public EntityCollectionManagerSync
         createCollectionManagerSync( CollectionScope collectionScope );
+
+    void invalidate();
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
index 1c3e258..c5dd961 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
@@ -19,6 +19,7 @@ package org.apache.usergrid.persistence.collection.guice;
 
 
 
+import org.apache.usergrid.persistence.collection.impl.EntityCollectionManagerFactoryImpl;
 import org.safehaus.guicyfig.GuicyFigModule;
 
 import org.apache.usergrid.persistence.collection.EntityCollectionManager;
@@ -81,10 +82,7 @@ public class CollectionModule extends AbstractModule {
         Multibinder.newSetBinder( binder(), EntityDeleted.class );
 
         // create a guice factor for getting our collection manager
-        install( new FactoryModuleBuilder()
-            .implement( EntityCollectionManager.class, EntityCollectionManagerImpl.class )
-            .implement( EntityCollectionManagerSync.class, EntityCollectionManagerSyncImpl.class )
-            .build( EntityCollectionManagerFactory.class ) );
+       bind(EntityCollectionManagerFactory.class).to(EntityCollectionManagerFactoryImpl.class);
 
         bind( UniqueValueSerializationStrategy.class ).to( UniqueValueSerializationStrategyImpl.class );
         bind( ChangeLogGenerator.class).to( ChangeLogGeneratorImpl.class);
@@ -116,7 +114,7 @@ public class CollectionModule extends AbstractModule {
     @Provides
     @CollectionTaskExecutor
     public TaskExecutor collectionTaskExecutor(final SerializationFig serializationFig){
-        return new NamedTaskExecutorImpl( "collectiontasks", 
+        return new NamedTaskExecutorImpl( "collectiontasks",
                 serializationFig.getTaskPoolThreadSize(), serializationFig.getTaskPoolQueueSize() );
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
new file mode 100644
index 0000000..790be19
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
@@ -0,0 +1,137 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.persistence.collection.impl;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.inject.assistedinject.Assisted;
+import com.netflix.astyanax.Keyspace;
+import org.apache.usergrid.persistence.collection.*;
+import org.apache.usergrid.persistence.collection.guice.CollectionTaskExecutor;
+import org.apache.usergrid.persistence.collection.guice.Write;
+import org.apache.usergrid.persistence.collection.guice.WriteUpdate;
+import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkCommit;
+import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkStart;
+import org.apache.usergrid.persistence.collection.mvcc.stage.write.*;
+import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.core.guice.ProxyImpl;
+import org.apache.usergrid.persistence.core.task.TaskExecutor;
+
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Classy class class.
+ */
+public class EntityCollectionManagerFactoryImpl implements EntityCollectionManagerFactory {
+
+
+    private final WriteStart writeStart;
+    private final WriteStart writeUpdate;
+    private final WriteUniqueVerify writeVerifyUnique;
+    private final WriteOptimisticVerify writeOptimisticVerify;
+    private final WriteCommit writeCommit;
+    private final RollbackAction rollback;
+    private final MarkStart markStart;
+    private final MarkCommit markCommit;
+    private final MvccEntitySerializationStrategy entitySerializationStrategy;
+    private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
+    private final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy;
+    private final Keyspace keyspace;
+    private final SerializationFig config;
+    private final EntityVersionCleanupFactory entityVersionCleanupFactory;
+    private final EntityVersionCreatedFactory entityVersionCreatedFactory;
+    private final EntityDeletedFactory entityDeletedFactory;
+    private final TaskExecutor taskExecutor;
+    private final CollectionScope collectionScope;
+    private LoadingCache<CollectionScope, EntityCollectionManager> ecmCache =
+        CacheBuilder.newBuilder().maximumSize( 1000 )
+            .build( new CacheLoader<CollectionScope, EntityCollectionManager>() {
+                public EntityCollectionManager load( CollectionScope scope ) {
+                    return new EntityCollectionManagerImpl(
+                        writeStart,
+                        writeUpdate,
+                        writeVerifyUnique,
+                        writeOptimisticVerify,writeCommit,rollback,markStart,markCommit,entitySerializationStrategy,uniqueValueSerializationStrategy,mvccLogEntrySerializationStrategy,keyspace,config,entityVersionCleanupFactory,entityVersionCreatedFactory,entityDeletedFactory,taskExecutor,collectionScope);
+                }
+            } );
+
+
+
+    public EntityCollectionManagerFactoryImpl( @Write final WriteStart writeStart,
+                                               @WriteUpdate final WriteStart              writeUpdate,
+                                               final WriteUniqueVerify writeVerifyUnique,
+                                               final WriteOptimisticVerify writeOptimisticVerify,
+                                               final WriteCommit writeCommit,
+                                               final RollbackAction rollback,
+                                               final MarkStart markStart,
+                                               final MarkCommit markCommit,
+                                               @ProxyImpl final MvccEntitySerializationStrategy entitySerializationStrategy,
+                                               final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
+                                               final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy,
+                                               final Keyspace keyspace,
+                                               final SerializationFig config,
+                                               final EntityVersionCleanupFactory entityVersionCleanupFactory,
+                                               final EntityVersionCreatedFactory          entityVersionCreatedFactory,
+                                               final EntityDeletedFactory                 entityDeletedFactory,
+                                               @CollectionTaskExecutor final TaskExecutor taskExecutor,
+                                               @Assisted final CollectionScope            collectionScope){
+
+        this.writeStart = writeStart;
+        this.writeUpdate = writeUpdate;
+        this.writeVerifyUnique = writeVerifyUnique;
+        this.writeOptimisticVerify = writeOptimisticVerify;
+        this.writeCommit = writeCommit;
+        this.rollback = rollback;
+        this.markStart = markStart;
+        this.markCommit = markCommit;
+        this.entitySerializationStrategy = entitySerializationStrategy;
+        this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy;
+        this.mvccLogEntrySerializationStrategy = mvccLogEntrySerializationStrategy;
+        this.keyspace = keyspace;
+        this.config = config;
+        this.entityVersionCleanupFactory = entityVersionCleanupFactory;
+        this.entityVersionCreatedFactory = entityVersionCreatedFactory;
+        this.entityDeletedFactory = entityDeletedFactory;
+        this.taskExecutor = taskExecutor;
+        this.collectionScope = collectionScope;
+    }
+    @Override
+    public EntityCollectionManager createCollectionManager(CollectionScope collectionScope) {
+        try{
+            return ecmCache.get(collectionScope);
+        }catch (ExecutionException ee){
+            throw new RuntimeException(ee);
+        }
+    }
+
+    @Override
+    public EntityCollectionManagerSync createCollectionManagerSync(CollectionScope collectionScope) {
+        return new EntityCollectionManagerSyncImpl(this,collectionScope);
+    }
+
+    @Override
+    public void invalidate() {
+        ecmCache.invalidateAll();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index 503b07d..11c5d44 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@ -33,7 +33,7 @@ import org.apache.usergrid.persistence.collection.MvccEntity;
 import org.apache.usergrid.persistence.collection.VersionSet;
 import org.apache.usergrid.persistence.collection.guice.Write;
 import org.apache.usergrid.persistence.collection.guice.WriteUpdate;
-import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategy;
 import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
 import org.apache.usergrid.persistence.collection.mvcc.entity.MvccValidationUtils;
 import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
index 9ff4f56..f7d5b58 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
@@ -24,7 +24,7 @@ import com.netflix.astyanax.MutationBatch;
 import org.apache.usergrid.persistence.collection.CollectionScope;
 import org.apache.usergrid.persistence.collection.EntityVersionCleanupFactory;
 import org.apache.usergrid.persistence.collection.event.EntityDeleted;
-import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategy;
 import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
 import org.apache.usergrid.persistence.core.task.Task;
 import org.apache.usergrid.persistence.model.entity.Id;
@@ -56,13 +56,13 @@ public class EntityDeletedTask implements Task<Void> {
 
 
     @Inject
-    public EntityDeletedTask( 
+    public EntityDeletedTask(
         EntityVersionCleanupFactory             entityVersionCleanupFactory,
         final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
         @ProxyImpl final MvccEntitySerializationStrategy entitySerializationStrategy,
         final Set<EntityDeleted>                listeners, // MUST be a set or Guice will not inject
-        @Assisted final CollectionScope         collectionScope, 
-        @Assisted final Id                      entityId, 
+        @Assisted final CollectionScope         collectionScope,
+        @Assisted final Id                      entityId,
         @Assisted final UUID                    version) {
 
         this.entityVersionCleanupFactory = entityVersionCleanupFactory;
@@ -81,7 +81,7 @@ public class EntityDeletedTask implements Task<Void> {
                 new Object[] { collectionScope, entityId, version }, throwable );
     }
 
-    
+
     @Override
     public Void rejected() {
         try {
@@ -94,9 +94,9 @@ public class EntityDeletedTask implements Task<Void> {
         return null;
     }
 
-    
+
     @Override
-    public Void call() throws Exception { 
+    public Void call() throws Exception {
 
         entityVersionCleanupFactory.getTask( collectionScope, entityId, version ).call();
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
index efecdeb..2f51eb5 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
@@ -35,7 +35,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.persistence.collection.CollectionScope;
 import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
-import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategy;
 import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
 import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
 import org.apache.usergrid.persistence.core.rx.ObservableIterator;
@@ -55,7 +55,7 @@ import rx.schedulers.Schedulers;
 
 
 /**
- * Cleans up previous versions from the specified version. Note that this means the version 
+ * Cleans up previous versions from the specified version. Note that this means the version
  * passed in the io event is retained, the range is exclusive.
  */
 public class EntityVersionCleanupTask implements Task<Void> {
@@ -77,7 +77,7 @@ public class EntityVersionCleanupTask implements Task<Void> {
 
 
     @Inject
-    public EntityVersionCleanupTask( 
+    public EntityVersionCleanupTask(
         final SerializationFig serializationFig,
         final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
         @ProxyImpl final MvccEntitySerializationStrategy   entitySerializationStrategy,
@@ -159,7 +159,7 @@ public class EntityVersionCleanupTask implements Task<Void> {
                                 continue;
                             }
                             final UniqueValue unique = new UniqueValueImpl( field, entityId, entityVersion);
-                            final MutationBatch deleteMutation = 
+                            final MutationBatch deleteMutation =
                                     uniqueValueSerializationStrategy.delete(scope,unique);
                             batch.mergeShallow(deleteMutation);
                         }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccEntityMigrationStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccEntityMigrationStrategy.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccEntityMigrationStrategy.java
new file mode 100644
index 0000000..0a3cd3a
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccEntityMigrationStrategy.java
@@ -0,0 +1,30 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.persistence.collection.mvcc;
+
+import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.core.migration.schema.MigrationStrategy;
+
+/**
+ * Classy class class.
+ */
+public interface MvccEntityMigrationStrategy extends MigrationStrategy<MvccEntitySerializationStrategy> {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccEntitySerializationStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccEntitySerializationStrategy.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccEntitySerializationStrategy.java
deleted file mode 100644
index 8a13115..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccEntitySerializationStrategy.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.persistence.collection.mvcc;
-
-
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.UUID;
-
-import org.apache.usergrid.persistence.collection.CollectionScope;
-import org.apache.usergrid.persistence.collection.EntitySet;
-import org.apache.usergrid.persistence.collection.MvccEntity;
-import org.apache.usergrid.persistence.core.migration.schema.Migration;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.netflix.astyanax.MutationBatch;
-
-
-/**
- * The interface that allows us to serialize an entity to disk
- */
-public interface MvccEntitySerializationStrategy extends Migration {
-
-    /**
-     * Serialize the entity to the data store with the given collection context
-     *
-     * @param entity The entity to persist
-     *
-     * @return The MutationBatch operations for this update
-     */
-    public MutationBatch write( CollectionScope context, MvccEntity entity );
-
-
-
-    /**
-     * Load the entities into the entitySet from the specified Ids.  Loads versions <= the maxVersion
-     * @param scope
-     * @param entityIds
-     * @return
-     */
-    public EntitySet load( CollectionScope scope, Collection<Id> entityIds, UUID maxVersion);
-
-    /**
-     * Load a list, from highest to lowest of the entity with versions <= version up to maxSize elements
-     *
-     * @param context The context to persist the entity into
-     * @param entityId The entity id to load
-     * @param version The max version to seek from.  I.E a stored version <= this argument
-     * @param fetchSize The fetch size to return for each trip to cassandra.
-     *
-     * @return An iterator of entities ordered from max(UUID)=> min(UUID).  The return value should be null
-     *         safe and return an empty list when there are no matches
-     */
-    public Iterator<MvccEntity> loadDescendingHistory( CollectionScope context, Id entityId, UUID version,
-                                                       int fetchSize );
-
-    /**
-     * Load a historical list of entities, from lowest to highest entity with versions < version up to maxSize elements
-     *
-     * @param context The context to persist the entity into
-     * @param entityId The entity id to load
-     * @param version The max version to seek to.  I.E a stored version < this argument
-     * @param fetchSize The fetch size to return for each trip to cassandra.
-     * @return An iterator of entities ordered from min(UUID)=> max(UUID).  The return value should be null
-     *         safe and return an empty list when there are no matches
-     */
-    public Iterator<MvccEntity> loadAscendingHistory( CollectionScope context, Id entityId, UUID version,
-                                                      int fetchSize );
-
-    /**
-     * Mark this  this version as deleted from the persistence store, but keep the version to mark that is has been cleared This
-     * can be used in a mark+sweep system.  The entity with the given version will exist in the context, but no data
-     * will be stored
-     */
-    public MutationBatch mark( CollectionScope context, Id entityId, UUID version );
-
-
-    /**
-     * Delete the entity from the context with the given entityId and version
-     *
-     * @param context The context that contains the entity
-     * @param entityId The entity id to delete
-     * @param version The version to delete
-     */
-    public MutationBatch delete( CollectionScope context, Id entityId, UUID version );
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java
index baf2ac3..380bf15 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java
@@ -25,7 +25,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.persistence.collection.CollectionScope;
-import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategy;
 import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
 import org.apache.usergrid.persistence.collection.MvccEntity;
 import org.apache.usergrid.persistence.collection.MvccLogEntry;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
index d3c8193..8604af6 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
@@ -25,7 +25,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.persistence.collection.CollectionScope;
 import org.apache.usergrid.persistence.collection.exception.WriteCommitException;
-import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategy;
 import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
 import org.apache.usergrid.persistence.collection.MvccEntity;
 import org.apache.usergrid.persistence.collection.MvccLogEntry;
@@ -53,7 +53,7 @@ import rx.functions.Func1;
 
 
 /**
- * This phase should invoke any finalization, and mark the entity as committed in the 
+ * This phase should invoke any finalization, and mark the entity as committed in the
  * data store before returning
  */
 @Singleton

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/EntityRepairImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/EntityRepairImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/EntityRepairImpl.java
index 4226fe6..1c7909b 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/EntityRepairImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/EntityRepairImpl.java
@@ -24,7 +24,6 @@ import java.util.Iterator;
 import java.util.List;
 
 import org.apache.usergrid.persistence.collection.CollectionScope;
-import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
 import org.apache.usergrid.persistence.collection.mvcc.changelog.ChangeLog;
 import org.apache.usergrid.persistence.collection.mvcc.changelog.ChangeLogGenerator;
 import org.apache.usergrid.persistence.collection.mvcc.changelog.ChangeLogGeneratorImpl;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategy.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategy.java
new file mode 100644
index 0000000..bf1422b
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategy.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.collection.serialization.impl;
+
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.EntitySet;
+import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.core.migration.schema.Migration;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.netflix.astyanax.MutationBatch;
+
+
+/**
+ * The interface that allows us to serialize an entity to disk
+ */
+public interface MvccEntitySerializationStrategy extends Migration {
+
+    /**
+     * Serialize the entity to the data store with the given collection context
+     *
+     * @param entity The entity to persist
+     * @return The MutationBatch operations for this update
+     */
+    public MutationBatch write(CollectionScope context, MvccEntity entity);
+
+
+    /**
+     * Load the entities into the entitySet from the specified Ids.  Loads versions <= the maxVersion
+     *
+     * @param scope
+     * @param entityIds
+     * @return
+     */
+    public EntitySet load(CollectionScope scope, Collection<Id> entityIds, UUID maxVersion);
+
+    /**
+     * Load a list, from highest to lowest of the entity with versions <= version up to maxSize elements
+     *
+     * @param context   The context to persist the entity into
+     * @param entityId  The entity id to load
+     * @param version   The max version to seek from.  I.E a stored version <= this argument
+     * @param fetchSize The fetch size to return for each trip to cassandra.
+     * @return An iterator of entities ordered from max(UUID)=> min(UUID).  The return value should be null
+     * safe and return an empty list when there are no matches
+     */
+    public Iterator<MvccEntity> loadDescendingHistory(CollectionScope context, Id entityId, UUID version,
+                                                      int fetchSize);
+
+    /**
+     * Load a historical list of entities, from lowest to highest entity with versions < version up to maxSize elements
+     *
+     * @param context   The context to persist the entity into
+     * @param entityId  The entity id to load
+     * @param version   The max version to seek to.  I.E a stored version < this argument
+     * @param fetchSize The fetch size to return for each trip to cassandra.
+     * @return An iterator of entities ordered from min(UUID)=> max(UUID).  The return value should be null
+     * safe and return an empty list when there are no matches
+     */
+    public Iterator<MvccEntity> loadAscendingHistory(CollectionScope context, Id entityId, UUID version,
+                                                     int fetchSize);
+
+    /**
+     * Mark this  this version as deleted from the persistence store, but keep the version to mark that is has been cleared This
+     * can be used in a mark+sweep system.  The entity with the given version will exist in the context, but no data
+     * will be stored
+     */
+    public MutationBatch mark(CollectionScope context, Id entityId, UUID version);
+
+
+    /**
+     * Delete the entity from the context with the given entityId and version
+     *
+     * @param context  The context that contains the entity
+     * @param entityId The entity id to delete
+     * @param version  The version to delete
+     */
+    public MutationBatch delete(CollectionScope context, Id entityId, UUID version);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java
index 6badbc1..3d36438 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java
@@ -37,7 +37,6 @@ import org.apache.usergrid.persistence.collection.EntitySet;
 import org.apache.usergrid.persistence.collection.MvccEntity;
 import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException;
 import org.apache.usergrid.persistence.collection.exception.DataCorruptionException;
-import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
 import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccEntityImpl;
 import org.apache.usergrid.persistence.collection.serialization.EntityRepair;
 import org.apache.usergrid.persistence.collection.serialization.SerializationFig;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxy.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxy.java
index f52c16d..bb192cd 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxy.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxy.java
@@ -19,18 +19,25 @@ package org.apache.usergrid.persistence.collection.serialization.impl;
 import com.google.inject.Inject;
 import com.netflix.astyanax.Keyspace;
 import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 import org.apache.usergrid.persistence.collection.CollectionScope;
 import org.apache.usergrid.persistence.collection.EntitySet;
 import org.apache.usergrid.persistence.collection.MvccEntity;
-import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.mvcc.MvccEntityMigrationStrategy;
 import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
+import org.apache.usergrid.persistence.core.migration.data.DataMigration;
+import org.apache.usergrid.persistence.core.migration.data.DataMigrationException;
 import org.apache.usergrid.persistence.core.migration.data.DataMigrationManager;
+import org.apache.usergrid.persistence.core.migration.schema.MigrationStrategy;
+import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+import rx.Observable;
+import rx.functions.Func1;
 
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.UUID;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicLong;
 
 
 /**
@@ -38,13 +45,13 @@ import java.util.UUID;
  * migration data goes to both sources and is read from the old source. After the ugprade completes,
  * it will be available from the new source
  */
-public abstract class MvccEntitySerializationStrategyProxy implements MvccEntitySerializationStrategy {
+public abstract class MvccEntitySerializationStrategyProxy implements MvccEntitySerializationStrategy, MvccEntityMigrationStrategy {
 
 
     private final DataMigrationManager dataMigrationManager;
-    private final Keyspace keyspace;
-    private final MvccEntitySerializationStrategy previous;
-    private final MvccEntitySerializationStrategy current;
+    protected final Keyspace keyspace;
+    protected final MvccEntitySerializationStrategy previous;
+    protected final MvccEntitySerializationStrategy current;
 
 
     @Inject
@@ -135,13 +142,11 @@ public abstract class MvccEntitySerializationStrategyProxy implements MvccEntity
         return current.delete( context, entityId, version );
     }
 
-    public abstract int getMigrationVersion();
-
     /**
      * Return true if we're on an old version
      */
     private boolean isOldVersion() {
-        return dataMigrationManager.getCurrentVersion() < getMigrationVersion();
+        return dataMigrationManager.getCurrentVersion() < getVersion();
     }
 
 
@@ -149,5 +154,74 @@ public abstract class MvccEntitySerializationStrategyProxy implements MvccEntity
     public Collection<MultiTennantColumnFamilyDefinition> getColumnFamilies() {
         return Collections.emptyList();
     }
+
+    @Override
+    public Observable migrate(final ApplicationEntityGroup applicationEntityGroup, final DataMigration.ProgressObserver observer) {
+        final AtomicLong atomicLong = new AtomicLong();
+        final MutationBatch totalBatch = keyspace.prepareMutationBatch();
+
+        final List<Id> entityIds = applicationEntityGroup.entityIds;
+
+        final UUID now = UUIDGenerator.newTimeUUID();
+
+        //go through each entity in the system, and load it's entire
+        // history
+        return Observable.from(entityIds)
+
+            .map(new Func1<Id, Id>() {
+                @Override
+                public Id call(Id entityId) {
+
+                    ApplicationScope applicationScope = applicationEntityGroup.applicationScope;
+
+                    if (!(applicationScope instanceof CollectionScope)) {
+                        throw new IllegalArgumentException("getCollectionScopeFromEntityId must return a collection scope");
+                    }
+
+                    CollectionScope currentScope = (CollectionScope) applicationScope;
+                    MigrationStrategy.MigrationRelationship<MvccEntitySerializationStrategy> migration = getMigration();
+                    //for each element in the history in the previous version,
+                    // copy it to the CF in v2
+                    Iterator<MvccEntity> allVersions = migration.from()
+                        .loadDescendingHistory(currentScope, entityId, now,
+                            1000);
+
+                    while (allVersions.hasNext()) {
+                        final MvccEntity version = allVersions.next();
+
+                        final MutationBatch versionBatch =
+                            migration.to().write(currentScope, version);
+
+                        totalBatch.mergeShallow(versionBatch);
+
+                        if (atomicLong.incrementAndGet() % 50 == 0) {
+                            executeBatch(totalBatch, observer, atomicLong);
+                        }
+                    }
+                    executeBatch(totalBatch, observer, atomicLong);
+                    return entityId;
+                }
+            })
+            .map(new Func1<Id, Long>() {
+                @Override
+                public Long call(Id id) {
+                    executeBatch(totalBatch, observer, atomicLong);
+                    return atomicLong.get();
+                }
+            });
+    }
+
+    protected void executeBatch( final MutationBatch batch, final DataMigration.ProgressObserver po, final AtomicLong count ) {
+        try {
+            batch.execute();
+
+            po.update( getVersion(), "Finished copying " + count + " entities to the new format" );
+        }
+        catch ( ConnectionException e ) {
+            po.failed( getVersion(), "Failed to execute mutation in cassandra" );
+            throw new DataMigrationException( "Unable to migrate batches ", e );
+        }
+    }
+
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyV1Impl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyV1Impl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyV1Impl.java
index defb966..7be7f70 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyV1Impl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyV1Impl.java
@@ -20,25 +20,20 @@
 package org.apache.usergrid.persistence.collection.serialization.impl;
 
 
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.UUID;
-
-import org.apache.usergrid.persistence.collection.CollectionScope;
-import org.apache.usergrid.persistence.collection.EntitySet;
-import org.apache.usergrid.persistence.collection.MvccEntity;
-import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
-import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
+import org.apache.usergrid.persistence.collection.mvcc.MvccEntityMigrationStrategy;
 import org.apache.usergrid.persistence.core.guice.V1Impl;
 import org.apache.usergrid.persistence.core.guice.V2Impl;
+import org.apache.usergrid.persistence.core.migration.data.DataMigration;
 import org.apache.usergrid.persistence.core.migration.data.DataMigrationManager;
-import org.apache.usergrid.persistence.model.entity.Id;
 
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.netflix.astyanax.Keyspace;
-import com.netflix.astyanax.MutationBatch;
+import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.entity.Id;
+import rx.Observable;
+import rx.functions.Func1;
 
 
 /**
@@ -47,9 +42,8 @@ import com.netflix.astyanax.MutationBatch;
  * it will be available from the new source
  */
 @Singleton
-public class MvccEntitySerializationStrategyProxyV1Impl extends MvccEntitySerializationStrategyProxy {
+public class MvccEntitySerializationStrategyProxyV1Impl extends MvccEntitySerializationStrategyProxy implements MvccEntityMigrationStrategy {
 
-    public static final int MIGRATION_VERSION = 3;
 
     @Inject
     public MvccEntitySerializationStrategyProxyV1Impl(final DataMigrationManager dataMigrationManager,
@@ -60,7 +54,13 @@ public class MvccEntitySerializationStrategyProxyV1Impl extends MvccEntitySerial
     }
 
     @Override
-    public int getMigrationVersion() {
-        return MIGRATION_VERSION;
+    public MigrationRelationship<MvccEntitySerializationStrategy> getMigration() {
+        return new MigrationRelationship<>(previous,current);
     }
+
+    @Override
+    public int getVersion() {
+        return V2Impl.MIGRATION_VERSION;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyV2Impl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyV2Impl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyV2Impl.java
index eec9188..6b24ac7 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyV2Impl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyV2Impl.java
@@ -3,20 +3,10 @@ package org.apache.usergrid.persistence.collection.serialization.impl;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.netflix.astyanax.Keyspace;
-import com.netflix.astyanax.MutationBatch;
-import org.apache.usergrid.persistence.collection.CollectionScope;
-import org.apache.usergrid.persistence.collection.EntitySet;
-import org.apache.usergrid.persistence.collection.MvccEntity;
-import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
-import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
+import org.apache.usergrid.persistence.collection.mvcc.MvccEntityMigrationStrategy;
 import org.apache.usergrid.persistence.core.guice.*;
 import org.apache.usergrid.persistence.core.migration.data.DataMigrationManager;
-import org.apache.usergrid.persistence.model.entity.Id;
 
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.UUID;
 
 /**
  * Version 4 implementation of entity serialization. This will proxy writes and reads so that during
@@ -24,9 +14,8 @@ import java.util.UUID;
  * it will be available from the new source
  */
 @Singleton
-public class MvccEntitySerializationStrategyProxyV2Impl extends MvccEntitySerializationStrategyProxy {
+public class MvccEntitySerializationStrategyProxyV2Impl extends MvccEntitySerializationStrategyProxy implements MvccEntityMigrationStrategy {
 
-    public static final int MIGRATION_VERSION = 4;
 
     @Inject
     public MvccEntitySerializationStrategyProxyV2Impl( final DataMigrationManager dataMigrationManager,
@@ -37,8 +26,16 @@ public class MvccEntitySerializationStrategyProxyV2Impl extends MvccEntitySerial
     }
 
     @Override
-    public int getMigrationVersion() {
-        return MIGRATION_VERSION;
+    public int getVersion() {
+        return V3Impl.MIGRATION_VERSION;
     }
 
+
+    @Override
+    public MigrationRelationship<MvccEntitySerializationStrategy> getMigration() {
+        return new MigrationRelationship<>(this.previous,this.current);
+    }
+
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3Impl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3Impl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3Impl.java
index 4a04ee4..96ac428 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3Impl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3Impl.java
@@ -1,30 +1,22 @@
 package org.apache.usergrid.persistence.collection.serialization.impl;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.dataformat.smile.SmileFactory;
 import com.google.common.base.Optional;
 import com.google.inject.Inject;
 import com.netflix.astyanax.Keyspace;
-import com.netflix.astyanax.MutationBatch;
 import com.netflix.astyanax.serializers.AbstractSerializer;
 import com.netflix.astyanax.serializers.UUIDSerializer;
-import org.apache.usergrid.persistence.collection.CollectionScope;
-import org.apache.usergrid.persistence.collection.EntitySet;
 import org.apache.usergrid.persistence.collection.MvccEntity;
 import org.apache.usergrid.persistence.collection.exception.DataCorruptionException;
 import org.apache.usergrid.persistence.collection.exception.EntityTooLargeException;
-import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
 import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
 import org.apache.usergrid.persistence.core.astyanax.*;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.EntityMap;
 import org.apache.usergrid.persistence.model.entity.Id;
-import org.apache.usergrid.persistence.model.field.value.EntityObject;
 
 import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.Iterator;
 import java.util.UUID;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ed0221f/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java
index 08b0c91..cf55eca 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java
@@ -18,10 +18,11 @@
 package org.apache.usergrid.persistence.collection.serialization.impl;
 
 
-import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.mvcc.MvccEntityMigrationStrategy;
 import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
 import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
 import org.apache.usergrid.persistence.core.guice.*;
+import org.apache.usergrid.persistence.core.migration.data.DataMigration;
 import org.apache.usergrid.persistence.core.migration.schema.Migration;
 
 import com.google.inject.AbstractModule;
@@ -46,15 +47,19 @@ public class SerializationModule extends AbstractModule {
         bind( MvccEntitySerializationStrategy.class ).annotatedWith( V2Impl.class )
                                                      .to(MvccEntitySerializationStrategyV2Impl.class);
         bind( MvccEntitySerializationStrategy.class ).annotatedWith( V3Impl.class )
-                                                    .to(MvccEntitySerializationStrategyV3Impl.class);
-        bind( MvccEntitySerializationStrategy.class ).annotatedWith( PreviousImpl.class )
-                                                    .to(MvccEntitySerializationStrategyProxyV2Impl.class);
-        bind( MvccEntitySerializationStrategy.class ).annotatedWith( CurrentImpl.class )
-                                                    .to(MvccEntitySerializationStrategyV3Impl.class);
+                                                     .to(MvccEntitySerializationStrategyV3Impl.class);
+
         bind(MvccEntitySerializationStrategy.class).annotatedWith( V1ProxyImpl.class )
-                                                    .to( MvccEntitySerializationStrategyProxyV1Impl.class );
-        bind( MvccEntitySerializationStrategy.class ).annotatedWith( ProxyImpl.class )
-                                                    .to( MvccEntitySerializationStrategyProxyV2Impl.class );
+                                                     .to(MvccEntitySerializationStrategyProxyV1Impl.class);
+        bind(MvccEntitySerializationStrategy.class ).annotatedWith( ProxyImpl.class )
+                                                     .to(MvccEntitySerializationStrategyProxyV2Impl.class);
+
+        Multibinder<DataMigration> dataMigrationMultibinder =
+            Multibinder.newSetBinder( binder(), DataMigration.class );
+        dataMigrationMultibinder.addBinding().to( MvccEntitySerializationStrategyProxyV2Impl.class );
+
+        bind( MvccEntityMigrationStrategy.class ).to(MvccEntitySerializationStrategyProxyV2Impl.class);
+
         bind( MvccLogEntrySerializationStrategy.class ).to( MvccLogEntrySerializationStrategyImpl.class );
         bind( UniqueValueSerializationStrategy.class ).to( UniqueValueSerializationStrategyImpl.class );