You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2014/11/21 22:35:52 UTC
[14/19] incubator-usergrid git commit: Finished migration testing.
All core tests pass.
Finished migration testing. All core tests pass.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/e32d5212
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/e32d5212
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/e32d5212
Branch: refs/heads/two-dot-o
Commit: e32d5212af3c53cc84c9a505cb9bb6824e6a16f2
Parents: 364a6ea
Author: Todd Nine <tn...@apigee.com>
Authored: Thu Nov 20 15:48:19 2014 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu Nov 20 15:48:19 2014 -0700
----------------------------------------------------------------------
.../migration/EntityDataMigration.java | 90 ++++---
.../rx/ApplicationObservable.java | 65 +++--
.../migration/EntityDataMigrationIT.java | 251 +++++++++++++++++++
.../migration/GraphShardVersionMigrationIT.java | 131 +++++-----
4 files changed, 394 insertions(+), 143 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e32d5212/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityDataMigration.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityDataMigration.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityDataMigration.java
index 79d31a8..767574d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityDataMigration.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityDataMigration.java
@@ -22,9 +22,7 @@ package org.apache.usergrid.corepersistence.migration;
import java.util.Iterator;
import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.usergrid.corepersistence.ManagerCache;
import org.apache.usergrid.corepersistence.rx.AllEntitiesInSystemObservable;
@@ -53,9 +51,6 @@ import rx.functions.Action1;
public class EntityDataMigration implements DataMigration {
- private static final Logger logger = LoggerFactory.getLogger( EntityDataMigration.class );
-
-
private final MvccEntitySerializationStrategy v1Serialization;
private final MvccEntitySerializationStrategy v2Serialization;
@@ -77,58 +72,71 @@ public class EntityDataMigration implements DataMigration {
@Override
public void migrate( final ProgressObserver observer ) throws Throwable {
+ final AtomicLong atomicLong = new AtomicLong();
+
+ AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 )
+ .doOnNext( new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() {
- AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 ).doOnNext(
- new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() {
+ @Override
+ public void call(
+ final AllEntitiesInSystemObservable.ApplicationEntityGroup
+ applicationEntityGroup ) {
- @Override
- public void call(
- final AllEntitiesInSystemObservable.ApplicationEntityGroup applicationEntityGroup ) {
+ final UUID now = UUIDGenerator.newTimeUUID();
- final UUID now = UUIDGenerator.newTimeUUID();
+ final Id appScopeId =
+ applicationEntityGroup.applicationScope.getApplication();
- final Id appScopeId = applicationEntityGroup.applicationScope.getApplication();
+ final MutationBatch totalBatch = keyspace.prepareMutationBatch();
- final MutationBatch totalBatch = keyspace.prepareMutationBatch();
+ //go through each entity in the system, and load it's entire
+ // history
+ for ( Id entityId : applicationEntityGroup.entityIds ) {
- for ( Id entityId : applicationEntityGroup.entityIds ) {
+ CollectionScope currentScope = CpNamingUtils
+ .getCollectionScopeNameFromEntityType( appScopeId,
+ entityId.getType() );
- CollectionScope currentScope = CpNamingUtils.getCollectionScopeNameFromEntityType(
- appScopeId, entityId.getType() );
+ //for each element in the history in the previous version,
+ // copy it to the CF in v2
+ Iterator<MvccEntity> allVersions = v1Serialization
+ .loadDescendingHistory( currentScope, entityId, now,
+ 1000 );
- Iterator<MvccEntity> allVersions =
- v1Serialization.loadDescendingHistory( currentScope, entityId, now, 1000 );
+ while ( allVersions.hasNext() ) {
+ final MvccEntity version = allVersions.next();
- while ( allVersions.hasNext() ) {
- final MvccEntity version = allVersions.next();
+ final MutationBatch versionBatch =
+ v2Serialization.write( currentScope, version );
- final MutationBatch versionBatch = v2Serialization.write( currentScope, version );
+ totalBatch.mergeShallow( versionBatch );
+
+ if ( atomicLong.incrementAndGet() % 50 == 0 ) {
+ executeBatch( totalBatch, observer, atomicLong );
+ }
+ }
+ }
+
+ executeBatch( totalBatch, observer, atomicLong );
+ }
+ } ).toBlocking().last();
+ }
- totalBatch.mergeShallow( versionBatch );
- if ( totalBatch.getRowCount() >= 50 ) {
- try {
- totalBatch.execute();
- }
- catch ( ConnectionException e ) {
- throw new DataMigrationException( "Unable to migrate batches ", e );
- }
- }
- }
- }
+ private void executeBatch( final MutationBatch batch, final ProgressObserver po, final AtomicLong count ) {
+ try {
+ batch.execute();
- try {
- totalBatch.execute();
- }
- catch ( ConnectionException e ) {
- throw new DataMigrationException( "Unable to migrate batches ", e );
- }
- }
- } ).toBlocking().last();
+ po.update( getVersion(), "Finished copying " + count + " entities to the new format" );
+ }
+ catch ( ConnectionException e ) {
+ po.failed( getVersion(), "Failed to execute mutation in cassandra" );
+ throw new DataMigrationException( "Unable to migrate batches ", e );
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e32d5212/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/ApplicationObservable.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/ApplicationObservable.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/ApplicationObservable.java
index 898812b..6019bca 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/ApplicationObservable.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/ApplicationObservable.java
@@ -23,18 +23,20 @@ package org.apache.usergrid.corepersistence.rx;
import java.util.Arrays;
import java.util.UUID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.usergrid.corepersistence.ManagerCache;
import org.apache.usergrid.corepersistence.util.CpNamingUtils;
import org.apache.usergrid.persistence.collection.CollectionScope;
import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.graph.GraphManager;
-import org.apache.usergrid.persistence.graph.GraphManagerFactory;
import org.apache.usergrid.persistence.graph.SearchByEdgeType;
import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
+import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
import rx.Observable;
@@ -47,14 +49,12 @@ import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getApplicat
/**
* An observable that will emit all application stored in the system.
*/
-public class ApplicationObservable {
-
+public class ApplicationObservable {
+ private static final Logger logger = LoggerFactory.getLogger( ApplicationObservable.class );
/**
* Get all applicationIds as an observable
- * @param managerCache
- * @return
*/
public static Observable<Id> getAllApplicationIds( final ManagerCache managerCache ) {
@@ -62,22 +62,20 @@ public class ApplicationObservable {
//this way consumers can perform whatever work they need to on the root system first
- final Observable<Id> systemIds = Observable.from( Arrays.asList( generateApplicationId( CpNamingUtils.DEFAULT_APPLICATION_ID ),
- generateApplicationId( CpNamingUtils.MANAGEMENT_APPLICATION_ID ),
- generateApplicationId( CpNamingUtils.SYSTEM_APP_ID ) ) );
-
-
+ final Observable<Id> systemIds = Observable.from( Arrays
+ .asList( generateApplicationId( CpNamingUtils.DEFAULT_APPLICATION_ID ),
+ generateApplicationId( CpNamingUtils.MANAGEMENT_APPLICATION_ID ),
+ generateApplicationId( CpNamingUtils.SYSTEM_APP_ID ) ) );
final ApplicationScope appScope = getApplicationScope( CpNamingUtils.SYSTEM_APP_ID );
- final CollectionScope appInfoCollectionScope = new CollectionScopeImpl(
- appScope.getApplication(),
- appScope.getApplication(),
- CpNamingUtils.getCollectionScopeNameFromCollectionName( CpNamingUtils.APPINFOS ));
+ final CollectionScope appInfoCollectionScope =
+ new CollectionScopeImpl( appScope.getApplication(), appScope.getApplication(),
+ CpNamingUtils.getCollectionScopeNameFromCollectionName( CpNamingUtils.APPINFOS ) );
- final EntityCollectionManager
- collectionManager = managerCache.getEntityCollectionManager( appInfoCollectionScope );
+ final EntityCollectionManager collectionManager =
+ managerCache.getEntityCollectionManager( appInfoCollectionScope );
final GraphManager gm = managerCache.getGraphManager( appScope );
@@ -97,21 +95,34 @@ public class ApplicationObservable {
//get the app info and load it
final Id appInfo = edge.getTargetNode();
- return collectionManager.load( appInfo ).map( new Func1<org.apache.usergrid.persistence.model.entity.Entity, Id>() {
+ return collectionManager.load( appInfo )
+ //filter out null entities
+ .filter( new Func1<Entity, Boolean>() {
+ @Override
+ public Boolean call( final Entity entity ) {
+ if ( entity == null ) {
+ logger.warn( "Encountered a null application info for id {}", appInfo );
+ return false;
+ }
+ return true;
+ }
+ } )
+ //get the id from the entity
+ .map( new Func1<org.apache.usergrid.persistence.model.entity.Entity, Id>() {
- @Override
- public Id call( final org.apache.usergrid.persistence.model.entity.Entity entity ) {
- final UUID uuid = (UUID )entity.getField( "applicationUuid" ).getValue();
- return CpNamingUtils.generateApplicationId(uuid);
- }
- } );
+ @Override
+ public Id call( final org.apache.usergrid.persistence.model.entity.Entity entity ) {
+
+ final UUID uuid = ( UUID ) entity.getField( "applicationUuid" ).getValue();
+
+ return CpNamingUtils.generateApplicationId( uuid );
+ }
+ } );
}
} );
- return Observable.merge( systemIds, appIds);
+ return Observable.merge( systemIds, appIds );
}
-
-
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e32d5212/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityDataMigrationIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityDataMigrationIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityDataMigrationIT.java
new file mode 100644
index 0000000..5c9e14c
--- /dev/null
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityDataMigrationIT.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.migration;
+
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.usergrid.AbstractCoreIT;
+import org.apache.usergrid.corepersistence.CpSetup;
+import org.apache.usergrid.corepersistence.EntityWriteHelper;
+import org.apache.usergrid.corepersistence.ManagerCache;
+import org.apache.usergrid.corepersistence.rx.AllEntitiesInSystemObservable;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.persistence.Entity;
+import org.apache.usergrid.persistence.EntityManager;
+import org.apache.usergrid.persistence.EntityManagerFactory;
+import org.apache.usergrid.persistence.SimpleEntityRef;
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.core.guice.CurrentImpl;
+import org.apache.usergrid.persistence.core.guice.PreviousImpl;
+import org.apache.usergrid.persistence.core.migration.data.DataMigrationManager;
+import org.apache.usergrid.persistence.core.migration.data.DataMigrationManagerImpl;
+import org.apache.usergrid.persistence.core.migration.data.MigrationInfoSerialization;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import com.google.inject.Injector;
+import com.google.inject.Key;
+
+import rx.functions.Action1;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+
+public class EntityDataMigrationIT extends AbstractCoreIT {
+
+
+ private Injector injector;
+
+
+ private EntityDataMigration entityDataMigration;
+ private ManagerCache managerCache;
+ private DataMigrationManager dataMigrationManager;
+ private MigrationInfoSerialization migrationInfoSerialization;
+ private MvccEntitySerializationStrategy v1Strategy;
+ private MvccEntitySerializationStrategy v2Strategy;
+ private EntityManagerFactory emf;
+
+
+ @Before
+ public void setup() {
+ emf = setup.getEmf();
+ injector = CpSetup.getInjector();
+ entityDataMigration = injector.getInstance( EntityDataMigration.class );
+ managerCache = injector.getInstance( ManagerCache.class );
+ dataMigrationManager = injector.getInstance( DataMigrationManager.class );
+ migrationInfoSerialization = injector.getInstance( MigrationInfoSerialization.class );
+ v1Strategy = injector.getInstance( Key.get(MvccEntitySerializationStrategy.class, PreviousImpl.class) );
+ v2Strategy = injector.getInstance( Key.get(MvccEntitySerializationStrategy.class, CurrentImpl.class) );
+ }
+
+
+ @Test
+ public void testDataMigration() throws Throwable {
+
+ assertEquals( "version 3 expected", 3, entityDataMigration.getVersion() );
+
+
+ /**
+ * Reset to our version -1 and start the migration
+ */
+ dataMigrationManager.resetToVersion( entityDataMigration.getVersion() - 1 );
+
+
+ final EntityManager newAppEm = app.getEntityManager();
+
+ final String type1 = "type1thing";
+ final String type2 = "type2thing";
+ final int size = 10;
+
+ final Set<Id> type1Identities = EntityWriteHelper.createTypes( newAppEm, type1, size );
+ final Set<Id> type2Identities = EntityWriteHelper.createTypes( newAppEm, type2, size );
+
+
+ final Set<Id> createdEntityIds = new HashSet<>();
+ createdEntityIds.addAll( type1Identities );
+ createdEntityIds.addAll( type2Identities );
+
+
+ final TestProgressObserver progressObserver = new TestProgressObserver();
+
+
+ //load everything that appears in v1, migrate and ensure it appears in v2
+ final Set<MvccEntity> savedEntities = new HashSet<>( 10000 );
+ //set that holds all entityIds for later assertion
+ final Set<Id> entityIds = new HashSet<>(10000);
+
+
+ //read everything in previous version format and put it into our types. Assumes we're
+ //using a test system, and it's not a huge amount of data, otherwise we'll overflow.
+
+ AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 )
+ .doOnNext( new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() {
+ @Override
+ public void call(
+ final AllEntitiesInSystemObservable.ApplicationEntityGroup entity ) {
+
+ //add all versions from history to our comparison
+ for ( final Id id : entity.entityIds ) {
+
+ CollectionScope scope = CpNamingUtils
+ .getCollectionScopeNameFromEntityType(
+ entity.applicationScope.getApplication(),
+ id.getType() );
+
+ final Iterator<MvccEntity> versions = v1Strategy
+ .loadDescendingHistory( scope, id, UUIDGenerator.newTimeUUID(),
+ 100 );
+
+ while ( versions.hasNext() ) {
+
+ final MvccEntity mvccEntity = versions.next();
+
+ savedEntities.add( mvccEntity );
+
+ createdEntityIds.remove( mvccEntity.getId() );
+
+ entityIds.add( id );
+ }
+ }
+ }
+ } ).toBlocking().lastOrDefault( null );
+
+ assertEquals( "Newly saved entities encountered", 0, createdEntityIds.size() );
+ assertTrue( "Saved new entities", savedEntities.size() > 0 );
+
+ //perform the migration
+ entityDataMigration.migrate( progressObserver );
+
+ assertFalse( "Progress observer should not have failed", progressObserver.getFailed() );
+ assertTrue( "Progress observer should have update messages", progressObserver.getUpdates().size() > 0 );
+
+
+ //write the status and version, then invalidate the cache so it appears
+ migrationInfoSerialization.setStatusCode( DataMigrationManagerImpl.StatusCode.COMPLETE.status );
+ migrationInfoSerialization.setVersion( entityDataMigration.getVersion() );
+ dataMigrationManager.invalidate();
+
+ assertEquals( "New version saved, and we should get new implementation", entityDataMigration.getVersion(),
+ dataMigrationManager.getCurrentVersion() );
+
+
+ //now visit all entities in the system again, load them from v2, and ensure they're the same
+ AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 )
+ .doOnNext( new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() {
+ @Override
+ public void call(
+ final AllEntitiesInSystemObservable
+ .ApplicationEntityGroup entity ) {
+ //add all versions from history to our comparison
+ for ( final Id id : entity.entityIds ) {
+
+ CollectionScope scope = CpNamingUtils
+ .getCollectionScopeNameFromEntityType(
+ entity.applicationScope.getApplication(),
+ id.getType() );
+
+ final Iterator<MvccEntity> versions = v2Strategy
+ .loadDescendingHistory( scope, id,
+ UUIDGenerator.newTimeUUID(), 100 );
+
+ while ( versions.hasNext() ) {
+
+ final MvccEntity mvccEntity = versions.next();
+
+ savedEntities.remove( mvccEntity );
+ }
+ }
+ }
+ }
+
+
+ ).toBlocking().lastOrDefault( null );
+
+
+ assertEquals( "All entities migrated", 0, savedEntities.size() );
+
+
+ //now visit all entities in the system again, and load them from the EM, ensure we see everything we did in the v1 traversal
+ AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 )
+ .doOnNext( new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() {
+ @Override
+ public void call(
+ final AllEntitiesInSystemObservable
+ .ApplicationEntityGroup entity ) {
+
+ final EntityManager em = emf.getEntityManager( entity.applicationScope.getApplication().getUuid() );
+
+ //add all versions from history to our comparison
+ for ( final Id id : entity.entityIds ) {
+
+
+ try {
+ final Entity emEntity = em.get( SimpleEntityRef.fromId( id ) );
+
+ if(emEntity != null){
+ entityIds.remove( id );
+ }
+ }
+ catch ( Exception e ) {
+ throw new RuntimeException("Error loading entity", e);
+ }
+ }
+ }
+ }
+
+
+ ).toBlocking().lastOrDefault( null );
+
+
+ assertEquals("All entities could be loaded by the entity manager", 0, entityIds.size());
+
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e32d5212/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java
index 88c02cd..51ea052 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java
@@ -42,7 +42,6 @@ import org.apache.usergrid.persistence.model.entity.Id;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import com.google.inject.Injector;
-import com.netflix.astyanax.Keyspace;
import rx.functions.Action1;
@@ -55,7 +54,6 @@ public class GraphShardVersionMigrationIT extends AbstractCoreIT {
private Injector injector;
private GraphShardVersionMigration graphShardVersionMigration;
- private Keyspace keyspace;
private ManagerCache managerCache;
private DataMigrationManager dataMigrationManager;
private MigrationInfoSerialization migrationInfoSerialization;
@@ -65,7 +63,6 @@ public class GraphShardVersionMigrationIT extends AbstractCoreIT {
public void setup() {
injector = CpSetup.getInjector();
graphShardVersionMigration = injector.getInstance( GraphShardVersionMigration.class );
- keyspace = injector.getInstance( Keyspace.class );
managerCache = injector.getInstance( ManagerCache.class );
dataMigrationManager = injector.getInstance( DataMigrationManager.class );
migrationInfoSerialization = injector.getInstance( MigrationInfoSerialization.class );
@@ -75,15 +72,12 @@ public class GraphShardVersionMigrationIT extends AbstractCoreIT {
@Test
public void testIdMapping() throws Throwable {
- assertEquals("version 2 expected", 2, graphShardVersionMigration.getVersion());
+ assertEquals( "version 2 expected", 2, graphShardVersionMigration.getVersion() );
/**
* Reset to our version -1 and start the migration
*/
- dataMigrationManager.resetToVersion( graphShardVersionMigration.getVersion()-1 );
-
-
-
+ dataMigrationManager.resetToVersion( graphShardVersionMigration.getVersion() - 1 );
final EntityManager newAppEm = app.getEntityManager();
@@ -111,15 +105,16 @@ public class GraphShardVersionMigrationIT extends AbstractCoreIT {
//read everything in previous version format and put it into our types.
- AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000)
+ AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 )
.doOnNext( new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() {
@Override
- public void call( final AllEntitiesInSystemObservable.ApplicationEntityGroup entity ) {
+ public void call(
+ final AllEntitiesInSystemObservable.ApplicationEntityGroup entity ) {
final GraphManager gm =
managerCache.getGraphManager( entity.applicationScope );
- for(final Id id: entity.entityIds) {
+ for ( final Id id : entity.entityIds ) {
/**
* Get our edge types from the source
*/
@@ -135,8 +130,7 @@ public class GraphShardVersionMigrationIT extends AbstractCoreIT {
/**
* Get the edge types to the target
*/
- gm.getEdgeTypesToTarget( new SimpleSearchEdgeType( id,
- null, null ) )
+ gm.getEdgeTypesToTarget( new SimpleSearchEdgeType( id, null, null ) )
.doOnNext( new Action1<String>() {
@Override
public void call( final String s ) {
@@ -153,7 +147,7 @@ public class GraphShardVersionMigrationIT extends AbstractCoreIT {
//perform the migration
graphShardVersionMigration.migrate( progressObserver );
- assertEquals("Newly saved entities encounterd", 0, allEntities.size());
+ assertEquals( "Newly saved entities encounterd", 0, allEntities.size() );
assertFalse( "Progress observer should not have failed", progressObserver.getFailed() );
assertTrue( "Progress observer should have update messages", progressObserver.getUpdates().size() > 0 );
@@ -163,70 +157,57 @@ public class GraphShardVersionMigrationIT extends AbstractCoreIT {
migrationInfoSerialization.setVersion( graphShardVersionMigration.getVersion() );
dataMigrationManager.invalidate();
- assertEquals("New version saved, and we should get new implementation", graphShardVersionMigration.getVersion(), dataMigrationManager.getCurrentVersion());
+ assertEquals( "New version saved, and we should get new implementation",
+ graphShardVersionMigration.getVersion(), dataMigrationManager.getCurrentVersion() );
//now visit all nodes in the system and remove their types from the multi maps, it should be empty at the end
AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 )
.doOnNext( new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() {
- @Override
- public void call(
- final AllEntitiesInSystemObservable.ApplicationEntityGroup entity ) {
-
- final GraphManager gm =
- managerCache.getGraphManager( entity.applicationScope );
-
- for ( final Id id : entity.entityIds ) {
- /**
- * Get our edge types from the source
- */
- gm.getEdgeTypesFromSource(
- new SimpleSearchEdgeType( id, null, null ) )
- .doOnNext( new Action1<String>() {
- @Override
- public void call( final String s ) {
- sourceTypes.remove( id, s );
- }
- } ).toBlocking().lastOrDefault( null );
-
-
- /**
- * Get the edge types to the target
- */
- gm.getEdgeTypesToTarget(
- new SimpleSearchEdgeType( id, null, null ) )
- .doOnNext( new Action1<String>() {
- @Override
- public void call( final String s ) {
- targetTypes.remove( id, s );
- }
- } ).toBlocking().lastOrDefault( null );
- }
- }
- }
-
-
- ).
-
-
- toBlocking()
-
-
- .
-
-
- lastOrDefault( null );
-
-
- assertEquals( "All source types migrated",0,sourceTypes.size( )
-
-
- );
-
-
- assertEquals( "All target types migrated",0,targetTypes.size( )
-
-
- );
- }
+ @Override
+ public void call(
+ final AllEntitiesInSystemObservable
+ .ApplicationEntityGroup entity ) {
+
+ final GraphManager gm =
+ managerCache.getGraphManager( entity.applicationScope );
+
+ for ( final Id id : entity.entityIds ) {
+ /**
+ * Get our edge types from the source
+ */
+ gm.getEdgeTypesFromSource(
+ new SimpleSearchEdgeType( id, null, null ) )
+ .doOnNext( new Action1<String>() {
+ @Override
+ public void call( final String s ) {
+ sourceTypes.remove( id, s );
+ }
+ } ).toBlocking().lastOrDefault( null );
+
+
+ /**
+ * Get the edge types to the target
+ */
+ gm.getEdgeTypesToTarget(
+ new SimpleSearchEdgeType( id, null, null ) )
+ .doOnNext( new Action1<String>() {
+ @Override
+ public void call( final String s ) {
+ targetTypes.remove( id, s );
+ }
+ } ).toBlocking().lastOrDefault( null );
+ }
+ }
+ }
+
+
+ ).toBlocking().lastOrDefault( null );
+
+
+ assertEquals( "All source types migrated", 0, sourceTypes.size() );
+
+
+ assertEquals( "All target types migrated", 0, targetTypes.size() );
}
+}