You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2014/11/21 22:35:52 UTC

[14/19] incubator-usergrid git commit: Finished migration testing. All core tests pass.

Finished migration testing.  All core tests pass.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/e32d5212
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/e32d5212
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/e32d5212

Branch: refs/heads/two-dot-o
Commit: e32d5212af3c53cc84c9a505cb9bb6824e6a16f2
Parents: 364a6ea
Author: Todd Nine <tn...@apigee.com>
Authored: Thu Nov 20 15:48:19 2014 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu Nov 20 15:48:19 2014 -0700

----------------------------------------------------------------------
 .../migration/EntityDataMigration.java          |  90 ++++---
 .../rx/ApplicationObservable.java               |  65 +++--
 .../migration/EntityDataMigrationIT.java        | 251 +++++++++++++++++++
 .../migration/GraphShardVersionMigrationIT.java | 131 +++++-----
 4 files changed, 394 insertions(+), 143 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e32d5212/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityDataMigration.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityDataMigration.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityDataMigration.java
index 79d31a8..767574d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityDataMigration.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityDataMigration.java
@@ -22,9 +22,7 @@ package org.apache.usergrid.corepersistence.migration;
 
 import java.util.Iterator;
 import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.usergrid.corepersistence.ManagerCache;
 import org.apache.usergrid.corepersistence.rx.AllEntitiesInSystemObservable;
@@ -53,9 +51,6 @@ import rx.functions.Action1;
 public class EntityDataMigration implements DataMigration {
 
 
-    private static final Logger logger = LoggerFactory.getLogger( EntityDataMigration.class );
-
-
     private final MvccEntitySerializationStrategy v1Serialization;
     private final MvccEntitySerializationStrategy v2Serialization;
 
@@ -77,58 +72,71 @@ public class EntityDataMigration implements DataMigration {
     @Override
     public void migrate( final ProgressObserver observer ) throws Throwable {
 
+        final AtomicLong atomicLong = new AtomicLong();
+
+        AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 )
+                                     .doOnNext( new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() {
 
-        AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 ).doOnNext(
-                new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() {
 
+                                                 @Override
+                                                 public void call(
+                                                         final AllEntitiesInSystemObservable.ApplicationEntityGroup
+                                                                 applicationEntityGroup ) {
 
-                    @Override
-                    public void call(
-                            final AllEntitiesInSystemObservable.ApplicationEntityGroup applicationEntityGroup ) {
 
+                                                     final UUID now = UUIDGenerator.newTimeUUID();
 
-                        final UUID now = UUIDGenerator.newTimeUUID();
+                                                     final Id appScopeId =
+                                                             applicationEntityGroup.applicationScope.getApplication();
 
-                        final Id appScopeId = applicationEntityGroup.applicationScope.getApplication();
 
+                                                     final MutationBatch totalBatch = keyspace.prepareMutationBatch();
 
-                        final MutationBatch totalBatch = keyspace.prepareMutationBatch();
+                                                     //go through each entity in the system, and load it's entire
+                                                     // history
+                                                     for ( Id entityId : applicationEntityGroup.entityIds ) {
 
-                        for ( Id entityId : applicationEntityGroup.entityIds ) {
+                                                         CollectionScope currentScope = CpNamingUtils
+                                                                 .getCollectionScopeNameFromEntityType( appScopeId,
+                                                                         entityId.getType() );
 
-                            CollectionScope currentScope = CpNamingUtils.getCollectionScopeNameFromEntityType(
-                                    appScopeId, entityId.getType() );
 
+                                                         //for each element in the history in the previous version,
+                                                         // copy it to the CF in v2
+                                                         Iterator<MvccEntity> allVersions = v1Serialization
+                                                                 .loadDescendingHistory( currentScope, entityId, now,
+                                                                         1000 );
 
-                            Iterator<MvccEntity> allVersions =
-                                    v1Serialization.loadDescendingHistory( currentScope, entityId, now, 1000 );
+                                                         while ( allVersions.hasNext() ) {
+                                                             final MvccEntity version = allVersions.next();
 
-                            while ( allVersions.hasNext() ) {
-                                final MvccEntity version = allVersions.next();
+                                                             final MutationBatch versionBatch =
+                                                                     v2Serialization.write( currentScope, version );
 
-                                final MutationBatch versionBatch = v2Serialization.write( currentScope, version );
+                                                             totalBatch.mergeShallow( versionBatch );
+
+                                                             if ( atomicLong.incrementAndGet() % 50 == 0 ) {
+                                                                 executeBatch( totalBatch, observer, atomicLong );
+                                                             }
+                                                         }
+                                                     }
+
+                                                     executeBatch( totalBatch, observer, atomicLong );
+                                                 }
+                                             } ).toBlocking().last();
+    }
 
-                                totalBatch.mergeShallow( versionBatch );
 
-                                if ( totalBatch.getRowCount() >= 50 ) {
-                                    try {
-                                        totalBatch.execute();
-                                    }
-                                    catch ( ConnectionException e ) {
-                                        throw new DataMigrationException( "Unable to migrate batches ", e );
-                                    }
-                                }
-                            }
-                        }
+    private void executeBatch( final MutationBatch batch, final ProgressObserver po, final AtomicLong count ) {
+        try {
+            batch.execute();
 
-                        try {
-                            totalBatch.execute();
-                        }
-                        catch ( ConnectionException e ) {
-                            throw new DataMigrationException( "Unable to migrate batches ", e );
-                        }
-                    }
-                } ).toBlocking().last();
+            po.update( getVersion(), "Finished copying " + count + " entities to the new format" );
+        }
+        catch ( ConnectionException e ) {
+            po.failed( getVersion(), "Failed to execute mutation in cassandra" );
+            throw new DataMigrationException( "Unable to migrate batches ", e );
+        }
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e32d5212/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/ApplicationObservable.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/ApplicationObservable.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/ApplicationObservable.java
index 898812b..6019bca 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/ApplicationObservable.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/ApplicationObservable.java
@@ -23,18 +23,20 @@ package org.apache.usergrid.corepersistence.rx;
 import java.util.Arrays;
 import java.util.UUID;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.usergrid.corepersistence.ManagerCache;
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.persistence.collection.CollectionScope;
 import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
 import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.GraphManager;
-import org.apache.usergrid.persistence.graph.GraphManagerFactory;
 import org.apache.usergrid.persistence.graph.SearchByEdgeType;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
+import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
 
 import rx.Observable;
@@ -47,14 +49,12 @@ import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getApplicat
 /**
  * An observable that will emit all application stored in the system.
  */
-public class ApplicationObservable  {
-
+public class ApplicationObservable {
 
+    private static final Logger logger = LoggerFactory.getLogger( ApplicationObservable.class );
 
     /**
      * Get all applicationIds as an observable
-     * @param managerCache
-     * @return
      */
     public static Observable<Id> getAllApplicationIds( final ManagerCache managerCache ) {
 
@@ -62,22 +62,20 @@ public class ApplicationObservable  {
         //this way consumers can perform whatever work they need to on the root system first
 
 
-       final Observable<Id> systemIds =  Observable.from( Arrays.asList( generateApplicationId( CpNamingUtils.DEFAULT_APPLICATION_ID ),
-                generateApplicationId( CpNamingUtils.MANAGEMENT_APPLICATION_ID ),
-                generateApplicationId( CpNamingUtils.SYSTEM_APP_ID ) ) );
-
-
+        final Observable<Id> systemIds = Observable.from( Arrays
+                .asList( generateApplicationId( CpNamingUtils.DEFAULT_APPLICATION_ID ),
+                        generateApplicationId( CpNamingUtils.MANAGEMENT_APPLICATION_ID ),
+                        generateApplicationId( CpNamingUtils.SYSTEM_APP_ID ) ) );
 
 
         final ApplicationScope appScope = getApplicationScope( CpNamingUtils.SYSTEM_APP_ID );
 
-        final CollectionScope appInfoCollectionScope = new CollectionScopeImpl(
-                appScope.getApplication(),
-                appScope.getApplication(),
-                CpNamingUtils.getCollectionScopeNameFromCollectionName( CpNamingUtils.APPINFOS ));
+        final CollectionScope appInfoCollectionScope =
+                new CollectionScopeImpl( appScope.getApplication(), appScope.getApplication(),
+                        CpNamingUtils.getCollectionScopeNameFromCollectionName( CpNamingUtils.APPINFOS ) );
 
-        final EntityCollectionManager
-                collectionManager = managerCache.getEntityCollectionManager( appInfoCollectionScope );
+        final EntityCollectionManager collectionManager =
+                managerCache.getEntityCollectionManager( appInfoCollectionScope );
 
 
         final GraphManager gm = managerCache.getGraphManager( appScope );
@@ -97,21 +95,34 @@ public class ApplicationObservable  {
                 //get the app info and load it
                 final Id appInfo = edge.getTargetNode();
 
-                return collectionManager.load( appInfo ).map( new Func1<org.apache.usergrid.persistence.model.entity.Entity, Id>() {
+                return collectionManager.load( appInfo )
+                        //filter out null entities
+                        .filter( new Func1<Entity, Boolean>() {
+                            @Override
+                            public Boolean call( final Entity entity ) {
+                                if ( entity == null ) {
+                                    logger.warn( "Encountered a null application info for id {}", appInfo );
+                                    return false;
+                                }
 
+                                return true;
+                            }
+                        } )
+                                //get the id from the entity
+                        .map( new Func1<org.apache.usergrid.persistence.model.entity.Entity, Id>() {
 
-                    @Override
-                    public Id call( final org.apache.usergrid.persistence.model.entity.Entity entity ) {
-                        final UUID  uuid = (UUID )entity.getField( "applicationUuid" ).getValue();
 
-                        return CpNamingUtils.generateApplicationId(uuid);
-                    }
-                } );
+                            @Override
+                            public Id call( final org.apache.usergrid.persistence.model.entity.Entity entity ) {
+
+                                final UUID uuid = ( UUID ) entity.getField( "applicationUuid" ).getValue();
+
+                                return CpNamingUtils.generateApplicationId( uuid );
+                            }
+                        } );
             }
         } );
 
-        return Observable.merge( systemIds, appIds);
+        return Observable.merge( systemIds, appIds );
     }
-
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e32d5212/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityDataMigrationIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityDataMigrationIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityDataMigrationIT.java
new file mode 100644
index 0000000..5c9e14c
--- /dev/null
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityDataMigrationIT.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.migration;
+
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.usergrid.AbstractCoreIT;
+import org.apache.usergrid.corepersistence.CpSetup;
+import org.apache.usergrid.corepersistence.EntityWriteHelper;
+import org.apache.usergrid.corepersistence.ManagerCache;
+import org.apache.usergrid.corepersistence.rx.AllEntitiesInSystemObservable;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.persistence.Entity;
+import org.apache.usergrid.persistence.EntityManager;
+import org.apache.usergrid.persistence.EntityManagerFactory;
+import org.apache.usergrid.persistence.SimpleEntityRef;
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.core.guice.CurrentImpl;
+import org.apache.usergrid.persistence.core.guice.PreviousImpl;
+import org.apache.usergrid.persistence.core.migration.data.DataMigrationManager;
+import org.apache.usergrid.persistence.core.migration.data.DataMigrationManagerImpl;
+import org.apache.usergrid.persistence.core.migration.data.MigrationInfoSerialization;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import com.google.inject.Injector;
+import com.google.inject.Key;
+
+import rx.functions.Action1;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+
+public class EntityDataMigrationIT extends AbstractCoreIT {
+
+
+    private Injector injector;
+
+
+    private EntityDataMigration entityDataMigration;
+    private ManagerCache managerCache;
+    private DataMigrationManager dataMigrationManager;
+    private MigrationInfoSerialization migrationInfoSerialization;
+    private MvccEntitySerializationStrategy v1Strategy;
+    private MvccEntitySerializationStrategy v2Strategy;
+    private EntityManagerFactory emf;
+
+
+    @Before
+    public void setup() {
+        emf = setup.getEmf();
+        injector = CpSetup.getInjector();
+        entityDataMigration = injector.getInstance( EntityDataMigration.class );
+        managerCache = injector.getInstance( ManagerCache.class );
+        dataMigrationManager = injector.getInstance( DataMigrationManager.class );
+        migrationInfoSerialization = injector.getInstance( MigrationInfoSerialization.class );
+        v1Strategy = injector.getInstance( Key.get(MvccEntitySerializationStrategy.class, PreviousImpl.class) );
+        v2Strategy = injector.getInstance( Key.get(MvccEntitySerializationStrategy.class, CurrentImpl.class) );
+    }
+
+
+    @Test
+    public void testDataMigration() throws Throwable {
+
+        assertEquals( "version 3 expected", 3, entityDataMigration.getVersion() );
+
+
+        /**
+         * Reset to our version -1 and start the migration
+         */
+        dataMigrationManager.resetToVersion( entityDataMigration.getVersion() - 1 );
+
+
+        final EntityManager newAppEm = app.getEntityManager();
+
+        final String type1 = "type1thing";
+        final String type2 = "type2thing";
+        final int size = 10;
+
+        final Set<Id> type1Identities = EntityWriteHelper.createTypes( newAppEm, type1, size );
+        final Set<Id> type2Identities = EntityWriteHelper.createTypes( newAppEm, type2, size );
+
+
+        final Set<Id> createdEntityIds = new HashSet<>();
+        createdEntityIds.addAll( type1Identities );
+        createdEntityIds.addAll( type2Identities );
+
+
+        final TestProgressObserver progressObserver = new TestProgressObserver();
+
+
+        //load everything that appears in v1, migrate and ensure it appears in v2
+        final Set<MvccEntity> savedEntities = new HashSet<>( 10000 );
+        //set that holds all entityIds for later assertion
+        final Set<Id> entityIds = new HashSet<>(10000);
+
+
+        //read everything in previous version format and put it into our types.  Assumes we're
+        //using a test system, and it's not a huge amount of data, otherwise we'll overflow.
+
+        AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 )
+                                     .doOnNext( new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() {
+                                         @Override
+                                         public void call(
+                                                 final AllEntitiesInSystemObservable.ApplicationEntityGroup entity ) {
+
+                                             //add all versions from history to our comparison
+                                             for ( final Id id : entity.entityIds ) {
+
+                                                 CollectionScope scope = CpNamingUtils
+                                                         .getCollectionScopeNameFromEntityType(
+                                                                 entity.applicationScope.getApplication(),
+                                                                 id.getType() );
+
+                                                 final Iterator<MvccEntity> versions = v1Strategy
+                                                         .loadDescendingHistory( scope, id, UUIDGenerator.newTimeUUID(),
+                                                                 100 );
+
+                                                 while ( versions.hasNext() ) {
+
+                                                     final MvccEntity mvccEntity = versions.next();
+
+                                                     savedEntities.add( mvccEntity );
+
+                                                     createdEntityIds.remove( mvccEntity.getId() );
+
+                                                     entityIds.add( id );
+                                                 }
+                                             }
+                                         }
+                                     } ).toBlocking().lastOrDefault( null );
+
+        assertEquals( "Newly saved entities encountered", 0, createdEntityIds.size() );
+        assertTrue( "Saved new entities", savedEntities.size() > 0 );
+
+        //perform the migration
+        entityDataMigration.migrate( progressObserver );
+
+        assertFalse( "Progress observer should not have failed", progressObserver.getFailed() );
+        assertTrue( "Progress observer should have update messages", progressObserver.getUpdates().size() > 0 );
+
+
+        //write the status and version, then invalidate the cache so it appears
+        migrationInfoSerialization.setStatusCode( DataMigrationManagerImpl.StatusCode.COMPLETE.status );
+        migrationInfoSerialization.setVersion( entityDataMigration.getVersion() );
+        dataMigrationManager.invalidate();
+
+        assertEquals( "New version saved, and we should get new implementation", entityDataMigration.getVersion(),
+                dataMigrationManager.getCurrentVersion() );
+
+
+        //now visit all entities in the system again, load them from v2, and ensure they're the same
+        AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 )
+                                     .doOnNext( new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() {
+                                                    @Override
+                                                    public void call(
+                                                            final AllEntitiesInSystemObservable
+                                                                    .ApplicationEntityGroup entity ) {
+                                                        //add all versions from history to our comparison
+                                                        for ( final Id id : entity.entityIds ) {
+
+                                                            CollectionScope scope = CpNamingUtils
+                                                                    .getCollectionScopeNameFromEntityType(
+                                                                            entity.applicationScope.getApplication(),
+                                                                            id.getType() );
+
+                                                            final Iterator<MvccEntity> versions = v2Strategy
+                                                                    .loadDescendingHistory( scope, id,
+                                                                            UUIDGenerator.newTimeUUID(), 100 );
+
+                                                            while ( versions.hasNext() ) {
+
+                                                                final MvccEntity mvccEntity = versions.next();
+
+                                                                savedEntities.remove( mvccEntity );
+                                                            }
+                                                        }
+                                                    }
+                                                }
+
+
+                                              ).toBlocking().lastOrDefault( null );
+
+
+        assertEquals( "All entities migrated", 0, savedEntities.size() );
+
+
+        //now visit all entities in the system again, and load them from the EM, ensure we see everything we did in the v1 traversal
+        AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 )
+                                     .doOnNext( new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() {
+                                                    @Override
+                                                    public void call(
+                                                            final AllEntitiesInSystemObservable
+                                                                    .ApplicationEntityGroup entity ) {
+
+                                                        final EntityManager em = emf.getEntityManager( entity.applicationScope.getApplication().getUuid() );
+
+                                                        //add all versions from history to our comparison
+                                                        for ( final Id id : entity.entityIds ) {
+
+
+                                                            try {
+                                                                final Entity emEntity = em.get( SimpleEntityRef.fromId( id ) );
+
+                                                                if(emEntity != null){
+                                                                    entityIds.remove( id );
+                                                                }
+                                                            }
+                                                            catch ( Exception e ) {
+                                                                throw new RuntimeException("Error loading entity", e);
+                                                            }
+                                                        }
+                                                    }
+                                                }
+
+
+                                              ).toBlocking().lastOrDefault( null );
+
+
+        assertEquals("All entities could be loaded by the entity manager", 0, entityIds.size());
+
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e32d5212/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java
index 88c02cd..51ea052 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java
@@ -42,7 +42,6 @@ import org.apache.usergrid.persistence.model.entity.Id;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
 import com.google.inject.Injector;
-import com.netflix.astyanax.Keyspace;
 
 import rx.functions.Action1;
 
@@ -55,7 +54,6 @@ public class GraphShardVersionMigrationIT extends AbstractCoreIT {
 
     private Injector injector;
     private GraphShardVersionMigration graphShardVersionMigration;
-    private Keyspace keyspace;
     private ManagerCache managerCache;
     private DataMigrationManager dataMigrationManager;
     private MigrationInfoSerialization migrationInfoSerialization;
@@ -65,7 +63,6 @@ public class GraphShardVersionMigrationIT extends AbstractCoreIT {
     public void setup() {
         injector = CpSetup.getInjector();
         graphShardVersionMigration = injector.getInstance( GraphShardVersionMigration.class );
-        keyspace = injector.getInstance( Keyspace.class );
         managerCache = injector.getInstance( ManagerCache.class );
         dataMigrationManager = injector.getInstance( DataMigrationManager.class );
         migrationInfoSerialization = injector.getInstance( MigrationInfoSerialization.class );
@@ -75,15 +72,12 @@ public class GraphShardVersionMigrationIT extends AbstractCoreIT {
     @Test
     public void testIdMapping() throws Throwable {
 
-        assertEquals("version 2 expected", 2, graphShardVersionMigration.getVersion());
+        assertEquals( "version 2 expected", 2, graphShardVersionMigration.getVersion() );
 
         /**
          * Reset to our version -1 and start the migration
          */
-        dataMigrationManager.resetToVersion( graphShardVersionMigration.getVersion()-1 );
-
-
-
+        dataMigrationManager.resetToVersion( graphShardVersionMigration.getVersion() - 1 );
 
 
         final EntityManager newAppEm = app.getEntityManager();
@@ -111,15 +105,16 @@ public class GraphShardVersionMigrationIT extends AbstractCoreIT {
 
         //read everything in previous version format and put it into our types.
 
-        AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000)
+        AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 )
                                      .doOnNext( new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() {
                                          @Override
-                                         public void call( final AllEntitiesInSystemObservable.ApplicationEntityGroup entity ) {
+                                         public void call(
+                                                 final AllEntitiesInSystemObservable.ApplicationEntityGroup entity ) {
 
                                              final GraphManager gm =
                                                      managerCache.getGraphManager( entity.applicationScope );
 
-                                             for(final Id id: entity.entityIds) {
+                                             for ( final Id id : entity.entityIds ) {
                                                  /**
                                                   * Get our edge types from the source
                                                   */
@@ -135,8 +130,7 @@ public class GraphShardVersionMigrationIT extends AbstractCoreIT {
                                                  /**
                                                   * Get the edge types to the target
                                                   */
-                                                 gm.getEdgeTypesToTarget( new SimpleSearchEdgeType( id,
-                                                         null, null ) )
+                                                 gm.getEdgeTypesToTarget( new SimpleSearchEdgeType( id, null, null ) )
                                                    .doOnNext( new Action1<String>() {
                                                        @Override
                                                        public void call( final String s ) {
@@ -153,7 +147,7 @@ public class GraphShardVersionMigrationIT extends AbstractCoreIT {
         //perform the migration
         graphShardVersionMigration.migrate( progressObserver );
 
-        assertEquals("Newly saved entities encounterd", 0, allEntities.size());
+        assertEquals( "Newly saved entities encounterd", 0, allEntities.size() );
         assertFalse( "Progress observer should not have failed", progressObserver.getFailed() );
         assertTrue( "Progress observer should have update messages", progressObserver.getUpdates().size() > 0 );
 
@@ -163,70 +157,57 @@ public class GraphShardVersionMigrationIT extends AbstractCoreIT {
         migrationInfoSerialization.setVersion( graphShardVersionMigration.getVersion() );
         dataMigrationManager.invalidate();
 
-        assertEquals("New version saved, and we should get new implementation", graphShardVersionMigration.getVersion(), dataMigrationManager.getCurrentVersion());
+        assertEquals( "New version saved, and we should get new implementation",
+                graphShardVersionMigration.getVersion(), dataMigrationManager.getCurrentVersion() );
 
 
         //now visit all nodes in the system and remove their types from the multi maps, it should be empty at the end
         AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 )
                                      .doOnNext( new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() {
-                                         @Override
-                                         public void call(
-                                                 final AllEntitiesInSystemObservable.ApplicationEntityGroup entity ) {
-
-                                             final GraphManager gm =
-                                                     managerCache.getGraphManager( entity.applicationScope );
-
-                                             for ( final Id id : entity.entityIds ) {
-                                                 /**
-                                                  * Get our edge types from the source
-                                                  */
-                                                 gm.getEdgeTypesFromSource(
-                                                         new SimpleSearchEdgeType( id, null, null ) )
-                                                   .doOnNext( new Action1<String>() {
-                                                       @Override
-                                                       public void call( final String s ) {
-                                                           sourceTypes.remove( id, s );
-                                                       }
-                                                   } ).toBlocking().lastOrDefault( null );
-
-
-                                                 /**
-                                                  * Get the edge types to the target
-                                                  */
-                                                 gm.getEdgeTypesToTarget(
-                                                         new SimpleSearchEdgeType( id, null, null ) )
-                                                   .doOnNext( new Action1<String>() {
-                                                       @Override
-                                                       public void call( final String s ) {
-                                                           targetTypes.remove( id, s );
-                                                       }
-                                                   } ).toBlocking().lastOrDefault( null );
-                                             }
-                                             }
-                                         }
-
-
-                                         ).
-
-
-                                         toBlocking()
-
-
-                                         .
-
-
-                                         lastOrDefault( null );
-
-
-                                         assertEquals( "All source types migrated",0,sourceTypes.size( )
-
-
-                                         );
-
-
-                                         assertEquals( "All target types migrated",0,targetTypes.size( )
-
-
-                                         );
-                                     }
+                                                    @Override
+                                                    public void call(
+                                                            final AllEntitiesInSystemObservable
+                                                                    .ApplicationEntityGroup entity ) {
+
+                                                        final GraphManager gm =
+                                                                managerCache.getGraphManager( entity.applicationScope );
+
+                                                        for ( final Id id : entity.entityIds ) {
+                                                            /**
+                                                             * Get our edge types from the source
+                                                             */
+                                                            gm.getEdgeTypesFromSource(
+                                                                    new SimpleSearchEdgeType( id, null, null ) )
+                                                              .doOnNext( new Action1<String>() {
+                                                                  @Override
+                                                                  public void call( final String s ) {
+                                                                      sourceTypes.remove( id, s );
+                                                                  }
+                                                              } ).toBlocking().lastOrDefault( null );
+
+
+                                                            /**
+                                                             * Get the edge types to the target
+                                                             */
+                                                            gm.getEdgeTypesToTarget(
+                                                                    new SimpleSearchEdgeType( id, null, null ) )
+                                                              .doOnNext( new Action1<String>() {
+                                                                  @Override
+                                                                  public void call( final String s ) {
+                                                                      targetTypes.remove( id, s );
+                                                                  }
+                                                              } ).toBlocking().lastOrDefault( null );
+                                                        }
+                                                    }
+                                                }
+
+
+                                              ).toBlocking().lastOrDefault( null );
+
+
+        assertEquals( "All source types migrated", 0, sourceTypes.size() );
+
+
+        assertEquals( "All target types migrated", 0, targetTypes.size() );
     }
+}