You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/02/16 19:08:23 UTC
incubator-usergrid git commit: adding specific migration types
Repository: incubator-usergrid
Updated Branches:
refs/heads/USERGRID-365 e52246952 -> 79ec9787c
adding specific migration types
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/79ec9787
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/79ec9787
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/79ec9787
Branch: refs/heads/USERGRID-365
Commit: 79ec9787c185b3e1a43aba5f456b5568b5735235
Parents: e522469
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Feb 16 10:07:55 2015 -0800
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Feb 16 10:07:55 2015 -0800
----------------------------------------------------------------------
.../usergrid/corepersistence/GuiceModule.java | 5 +-
.../migration/EntityTypeMappingMigration.java | 48 +++--
.../impl/AllEntitiesInSystemObservableImpl.java | 112 +++++------
.../rx/impl/ApplicationObservableImpl.java | 12 ++
.../migration/EntityDataMigrationIT.java | 24 +--
.../org/apache/usergrid/persistence/GeoIT.java | 5 +-
.../impl/MvccEntityDataMigrationImpl.java | 97 +++++-----
.../serialization/impl/SerializationModule.java | 9 +-
.../persistence/core/guice/CommonModule.java | 9 +-
.../data/ApplicationDataMigration.java | 38 ++++
.../migration/data/CollectionDataMigration.java | 36 ++++
.../core/migration/data/DataMigration.java | 12 +-
.../data/DataMigrationManagerImpl.java | 193 ++++++++++++-------
.../core/rx/AllEntitiesInSystemObservable.java | 6 +-
.../core/rx/ApplicationObservable.java | 6 +
.../core/guice/MaxMigrationModule.java | 3 +-
.../core/guice/MaxMigrationVersion.java | 9 +-
.../data/DataMigrationManagerImplTest.java | 70 ++++---
.../core/rx/ApplicationsTestObservable.java | 6 +
.../persistence/graph/guice/GraphModule.java | 9 +-
.../impl/EdgeDataMigrationImpl.java | 122 ++++++------
21 files changed, 495 insertions(+), 336 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/79ec9787/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
index 4c1e5c3..7098572 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
@@ -31,6 +31,7 @@ import org.apache.usergrid.persistence.collection.event.EntityVersionCreated;
import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
import org.apache.usergrid.persistence.collection.guice.CollectionModule;
import org.apache.usergrid.persistence.core.guice.CommonModule;
+import org.apache.usergrid.persistence.core.migration.data.CollectionDataMigration;
import org.apache.usergrid.persistence.core.migration.data.DataMigration;
import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
import org.apache.usergrid.persistence.core.rx.ApplicationObservable;
@@ -79,8 +80,8 @@ public class GuiceModule extends AbstractModule {
bind(AllEntitiesInSystemObservable.class).to( AllEntitiesInSystemObservableImpl.class );
bind(ApplicationObservable.class).to( ApplicationObservableImpl.class );
- Multibinder<DataMigration> dataMigrationMultibinder =
- Multibinder.newSetBinder( binder(), DataMigration.class );
+ Multibinder<CollectionDataMigration> dataMigrationMultibinder =
+ Multibinder.newSetBinder( binder(), CollectionDataMigration.class );
dataMigrationMultibinder.addBinding().to( EntityTypeMappingMigration.class );
Multibinder<EntityDeleted> entityBinder =
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/79ec9787/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java
index 81503d5..4dbc373 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java
@@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.usergrid.corepersistence.ManagerCache;
import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.persistence.core.migration.data.CollectionDataMigration;
import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
import org.apache.usergrid.persistence.core.migration.data.DataMigration;
import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
@@ -46,7 +47,7 @@ import rx.schedulers.Schedulers;
/**
* Migration to ensure that our entity id is written into our map data
*/
-public class EntityTypeMappingMigration implements DataMigration {
+public class EntityTypeMappingMigration implements CollectionDataMigration {
private final ManagerCache managerCache;
private final AllEntitiesInSystemObservable allEntitiesInSystemObservable;
@@ -60,29 +61,34 @@ public class EntityTypeMappingMigration implements DataMigration {
@Override
- public Observable migrate(final ApplicationEntityGroup applicationEntityGroup, final ProgressObserver observer) throws Throwable {
+ public Observable migrate(final Observable<ApplicationEntityGroup> applicationEntityGroupObservable, final ProgressObserver observer) throws Throwable {
final AtomicLong atomicLong = new AtomicLong();
- final MapScope ms = CpNamingUtils.getEntityTypeMapScope(applicationEntityGroup.applicationScope.getApplication());
-
- final MapManager mapManager = managerCache.getMapManager(ms);
- return Observable.from(applicationEntityGroup.entityIds)
- .subscribeOn(Schedulers.io())
- .map(new Func1<EntityIdScope, Long>() {
- @Override
- public Long call(EntityIdScope idScope) {
- final UUID entityUuid = idScope.getId().getUuid();
- final String entityType = idScope.getId().getType();
-
- mapManager.putString(entityUuid.toString(), entityType);
-
- if (atomicLong.incrementAndGet() % 100 == 0) {
- updateStatus(atomicLong, observer);
- }
- return atomicLong.get();
- }
- });
+ return applicationEntityGroupObservable.flatMap(new Func1<ApplicationEntityGroup, Observable<Long>>() {
+ @Override
+ public Observable call(final ApplicationEntityGroup applicationEntityGroup) {
+ final MapScope ms = CpNamingUtils.getEntityTypeMapScope(applicationEntityGroup.applicationScope.getApplication());
+
+ final MapManager mapManager = managerCache.getMapManager(ms);
+ return Observable.from(applicationEntityGroup.entityIds)
+ .subscribeOn(Schedulers.io())
+ .map(new Func1<EntityIdScope, Long>() {
+ @Override
+ public Long call(EntityIdScope idScope) {
+ final UUID entityUuid = idScope.getId().getUuid();
+ final String entityType = idScope.getId().getType();
+
+ mapManager.putString(entityUuid.toString(), entityType);
+
+ if (atomicLong.incrementAndGet() % 100 == 0) {
+ updateStatus(atomicLong, observer);
+ }
+ return atomicLong.get();
+ }
+ });
+ }
+ });
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/79ec9787/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntitiesInSystemObservableImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntitiesInSystemObservableImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntitiesInSystemObservableImpl.java
index a4fbc5d..55667cb 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntitiesInSystemObservableImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntitiesInSystemObservableImpl.java
@@ -53,71 +53,75 @@ public class AllEntitiesInSystemObservableImpl implements AllEntitiesInSystemObs
private final TargetIdObservable targetIdObservable;
@Inject
- public AllEntitiesInSystemObservableImpl(ApplicationObservable applicationObservable, GraphManagerFactory graphManagerFactory, TargetIdObservable targetIdObservable){
+ public AllEntitiesInSystemObservableImpl(ApplicationObservable applicationObservable, GraphManagerFactory graphManagerFactory, TargetIdObservable targetIdObservable) {
this.applicationObservable = applicationObservable;
this.graphManagerFactory = graphManagerFactory;
this.targetIdObservable = targetIdObservable;
}
- public Observable<ApplicationEntityGroup<CollectionScope>> getAllEntitiesInSystem(final int bufferSize) {
- return getAllEntitiesInSystem(applicationObservable.getAllApplicationIds( ),bufferSize);
+ public Observable<ApplicationEntityGroup<CollectionScope>> getAllEntitiesInSystem(final int bufferSize) {
+ return getAllEntitiesInSystem(applicationObservable.getAllApplicationIds().map(new Func1<Id, ApplicationScope>() {
+ @Override
+ public ApplicationScope call(Id id) {
+ return new ApplicationScopeImpl(id);
+ }
+ }), bufferSize);
}
- public Observable<ApplicationEntityGroup<CollectionScope>> getAllEntitiesInSystem(final Observable<Id> appIdsObservable, final int bufferSize) {
+ public Observable<ApplicationEntityGroup<CollectionScope>> getAllEntitiesInSystem(final Observable<ApplicationScope> appIdsObservable, final int bufferSize) {
//traverse all nodes in the graph, load all source edges from them, then re-save the meta data
- return appIdsObservable.flatMap(new Func1<Id, Observable<ApplicationEntityGroup<CollectionScope>>>() {
- @Override
- public Observable<ApplicationEntityGroup<CollectionScope>> call(final Id applicationId) {
-
- //set up our application scope and graph manager
- final ApplicationScope applicationScope = new ApplicationScopeImpl(
- applicationId);
-
- final GraphManager gm = graphManagerFactory.createEdgeManager(applicationScope);
-
- //load all nodes that are targets of our application node. I.E.
- // entities that have been saved
- final Observable<Id> entityNodes =
- targetIdObservable.getTargetNodes(gm, applicationId);
-
-
- //get scope here
-
-
- //emit Scope + ID
-
- //create our application node to emit since it's an entity as well
- final Observable<Id> applicationNode = Observable.just(applicationId);
-
- //merge both the specified application node and the entity node
- // so they all get used
- return Observable
- .merge(applicationNode, entityNodes)
- .buffer(bufferSize)
- .map(new Func1<List<Id>, List<EntityIdScope<CollectionScope>>>() {
- @Override
- public List<EntityIdScope<CollectionScope>> call(List<Id> ids) {
- List<EntityIdScope<CollectionScope>> scopes = new ArrayList<>(ids.size());
- for (Id id : ids) {
- CollectionScope scope = CpNamingUtils.getCollectionScopeNameFromEntityType(applicationId, id.getType());
- EntityIdScope<CollectionScope> idScope = new EntityIdScope<>(id, scope);
- scopes.add(idScope);
- }
- return scopes;
- }
- })
- .map(new Func1<List<EntityIdScope<CollectionScope>>, ApplicationEntityGroup<CollectionScope>>() {
- @Override
- public ApplicationEntityGroup<CollectionScope> call(final List<EntityIdScope<CollectionScope>> scopes) {
- return new ApplicationEntityGroup<>(applicationScope, scopes);
- }
- });
- }
- } );
+ return appIdsObservable
+ .flatMap(new Func1<ApplicationScope, Observable<ApplicationEntityGroup<CollectionScope>>>() {
+ @Override
+ public Observable<ApplicationEntityGroup<CollectionScope>> call(final ApplicationScope applicationScope) {
+ return getAllEntities(applicationScope, bufferSize);
+ }
+ });
}
-
+ private Observable<ApplicationEntityGroup<CollectionScope>> getAllEntities(final ApplicationScope applicationScope, final int bufferSize) {
+ final GraphManager gm = graphManagerFactory.createEdgeManager(applicationScope);
+ final Id applicationId = applicationScope.getApplication();
+
+ //load all nodes that are targets of our application node. I.E.
+ // entities that have been saved
+ final Observable<Id> entityNodes =
+ targetIdObservable.getTargetNodes(gm, applicationId);
+
+
+ //get scope here
+
+
+ //emit Scope + ID
+
+ //create our application node to emit since it's an entity as well
+ final Observable<Id> applicationNode = Observable.just(applicationId);
+
+ //merge both the specified application node and the entity node
+ // so they all get used
+ return Observable
+ .merge(applicationNode, entityNodes)
+ .buffer(bufferSize)
+ .map(new Func1<List<Id>, List<EntityIdScope<CollectionScope>>>() {
+ @Override
+ public List<EntityIdScope<CollectionScope>> call(List<Id> ids) {
+ List<EntityIdScope<CollectionScope>> scopes = new ArrayList<>(ids.size());
+ for (Id id : ids) {
+ CollectionScope scope = CpNamingUtils.getCollectionScopeNameFromEntityType(applicationId, id.getType());
+ EntityIdScope<CollectionScope> idScope = new EntityIdScope<>(id, scope);
+ scopes.add(idScope);
+ }
+ return scopes;
+ }
+ })
+ .map(new Func1<List<EntityIdScope<CollectionScope>>, ApplicationEntityGroup<CollectionScope>>() {
+ @Override
+ public ApplicationEntityGroup<CollectionScope> call(final List<EntityIdScope<CollectionScope>> scopes) {
+ return new ApplicationEntityGroup<>(applicationScope, scopes);
+ }
+ });
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/79ec9787/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/ApplicationObservableImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/ApplicationObservableImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/ApplicationObservableImpl.java
index 9a5e084..f4a1057 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/ApplicationObservableImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/ApplicationObservableImpl.java
@@ -26,6 +26,7 @@ import java.util.UUID;
import com.google.inject.Inject;
import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
import org.apache.usergrid.persistence.core.rx.ApplicationObservable;
+import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
import org.apache.usergrid.persistence.graph.GraphManagerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -134,4 +135,15 @@ public class ApplicationObservableImpl implements ApplicationObservable {
return Observable.merge( systemIds, appIds );
}
+
+ @Override
+ public Observable<ApplicationScope> getAllApplicationScopes() {
+ return getAllApplicationIds().map(new Func1<Id, ApplicationScope>() {
+ @Override
+ public ApplicationScope call(Id id) {
+ ApplicationScope scope = new ApplicationScopeImpl(id);
+ return scope;
+ }
+ });
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/79ec9787/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityDataMigrationIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityDataMigrationIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityDataMigrationIT.java
index 2da1f37..4d87f8b 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityDataMigrationIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityDataMigrationIT.java
@@ -27,8 +27,9 @@ import java.util.Set;
import org.apache.usergrid.persistence.collection.mvcc.MvccEntityMigrationStrategy;
import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntityDataMigrationImpl;
import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategyProxyV2Impl;
-import org.apache.usergrid.persistence.core.migration.data.DataMigration;
+import org.apache.usergrid.persistence.core.migration.data.*;
import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
+import org.apache.usergrid.persistence.core.rx.ApplicationObservable;
import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
import org.apache.usergrid.persistence.core.scope.EntityIdScope;
import org.junit.Before;
@@ -47,9 +48,6 @@ import org.apache.usergrid.persistence.SimpleEntityRef;
import org.apache.usergrid.persistence.collection.CollectionScope;
import org.apache.usergrid.persistence.collection.MvccEntity;
import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
-import org.apache.usergrid.persistence.core.migration.data.DataMigrationManager;
-import org.apache.usergrid.persistence.core.migration.data.DataMigrationManagerImpl;
-import org.apache.usergrid.persistence.core.migration.data.MigrationInfoSerialization;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
@@ -72,7 +70,7 @@ public class EntityDataMigrationIT extends AbstractCoreIT {
private Injector injector;
- private DataMigration entityDataMigration;
+ private CollectionDataMigration entityDataMigration;
private DataMigrationManager dataMigrationManager;
private MigrationInfoSerialization migrationInfoSerialization;
private MvccEntitySerializationStrategy v1Strategy;
@@ -89,6 +87,7 @@ public class EntityDataMigrationIT extends AbstractCoreIT {
new MigrationTestRule( app, SpringResource.getInstance().getBean( Injector.class ) ,MvccEntityDataMigrationImpl.class );
private AllEntitiesInSystemObservable allEntitiesInSystemObservable;
+ private ApplicationObservable applicationObservable;
@Before
@@ -101,6 +100,8 @@ public class EntityDataMigrationIT extends AbstractCoreIT {
migrationInfoSerialization = injector.getInstance( MigrationInfoSerialization.class );
MvccEntityMigrationStrategy strategy = injector.getInstance(Key.get(MvccEntityMigrationStrategy.class));
allEntitiesInSystemObservable = injector.getInstance(AllEntitiesInSystemObservable.class);
+ applicationObservable = injector.getInstance(ApplicationObservable.class);
+
v1Strategy = strategy.getMigration().from();
v2Strategy = strategy.getMigration().to();
}
@@ -177,18 +178,9 @@ public class EntityDataMigrationIT extends AbstractCoreIT {
assertTrue( "Saved new entities", savedEntities.size() > 0 );
//perform the migration
- allEntitiesInSystemObservable.getAllEntitiesInSystem( 1000)
- .doOnNext(new Action1<ApplicationEntityGroup>() {
- @Override
- public void call(ApplicationEntityGroup applicationEntityGroup) {
- try {
- entityDataMigration.migrate(applicationEntityGroup, progressObserver).toBlocking().last();
- }catch (Throwable e){
- throw new RuntimeException(e);
- }
- }
- }).toBlocking().last();
+ rx.Observable<ApplicationEntityGroup> oRx = allEntitiesInSystemObservable.getAllEntitiesInSystem(applicationObservable.getAllApplicationScopes(), 1000);
+ entityDataMigration.migrate(oRx, progressObserver).toBlocking().last();
assertFalse( "Progress observer should not have failed", progressObserver.getFailed() );
assertTrue( "Progress observer should have update messages", progressObserver.getUpdates().size() > 0 );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/79ec9787/stack/core/src/test/java/org/apache/usergrid/persistence/GeoIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/GeoIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/GeoIT.java
index a1ac4ff..ff15063 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/GeoIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/GeoIT.java
@@ -177,7 +177,7 @@ public class GeoIT extends AbstractCoreIT {
Map<String, Object> restaurantProps = new LinkedHashMap<String, Object>();
restaurantProps.put("name", "Brickhouse");
restaurantProps.put("address", "426 Brannan Street");
- restaurantProps.put("location", getLocation(37.779632, -122.395131));
+ restaurantProps.put("location", getLocation(37.776753, -122.407846));
Entity restaurant = em.create("restaurant", restaurantProps);
assertNotNull(restaurant);
@@ -193,6 +193,7 @@ public class GeoIT extends AbstractCoreIT {
Entity user = em.create("user", userProperties);
assertNotNull(user);
+ em.refreshIndex();
//3. Create a connection between the user and the entity
em.createConnection(user, "likes", restaurant);
@@ -200,7 +201,7 @@ public class GeoIT extends AbstractCoreIT {
em.refreshIndex();
//4. Test that the user is within 2000m of the entity
Results emSearchResults = em.searchConnectedEntities(user,
- Query.fromQL("location within 2000 of "
+ Query.fromQL("location within 5000 of "
+ ((LinkedHashMap<String, Object>) userProperties.get("location")).get("latitude")
+ ", " + ((LinkedHashMap<String, Object>)
userProperties.get("location")).get("longitude")).setConnectionType("likes"));
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/79ec9787/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntityDataMigrationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntityDataMigrationImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntityDataMigrationImpl.java
index bae24f4..7951383 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntityDataMigrationImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntityDataMigrationImpl.java
@@ -27,6 +27,7 @@ import org.apache.usergrid.persistence.collection.CollectionScope;
import org.apache.usergrid.persistence.collection.MvccEntity;
import org.apache.usergrid.persistence.collection.mvcc.MvccEntityMigrationStrategy;
import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.core.migration.data.CollectionDataMigration;
import org.apache.usergrid.persistence.core.migration.data.DataMigration;
import org.apache.usergrid.persistence.core.migration.data.DataMigrationException;
import org.apache.usergrid.persistence.core.migration.schema.MigrationStrategy;
@@ -48,7 +49,7 @@ import java.util.concurrent.atomic.AtomicLong;
/**
* Data migration strategy for entities
*/
-public class MvccEntityDataMigrationImpl implements DataMigration {
+public class MvccEntityDataMigrationImpl implements CollectionDataMigration {
private final Keyspace keyspace;
private final MvccEntityMigrationStrategy entityMigrationStrategy;
@@ -61,59 +62,65 @@ public class MvccEntityDataMigrationImpl implements DataMigration {
}
@Override
- public Observable migrate(final ApplicationEntityGroup applicationEntityGroup, final DataMigration.ProgressObserver observer) {
+ public Observable migrate(final Observable<ApplicationEntityGroup> applicationEntityGroupObservable, final DataMigration.ProgressObserver observer) {
final AtomicLong atomicLong = new AtomicLong();
final MutationBatch totalBatch = keyspace.prepareMutationBatch();
- final List<EntityIdScope<CollectionScope>> entityIds = applicationEntityGroup.entityIds;
final UUID now = UUIDGenerator.newTimeUUID();
- //go through each entity in the system, and load it's entire
- // history
- return Observable.from(entityIds)
- .subscribeOn(Schedulers.io())
- .map(new Func1<EntityIdScope<CollectionScope>, Id>() {
- @Override
- public Id call(EntityIdScope<CollectionScope> idScope) {
-
- ApplicationScope applicationScope = applicationEntityGroup.applicationScope;
- MigrationStrategy.MigrationRelationship<MvccEntitySerializationStrategy> migration = entityMigrationStrategy.getMigration();
-
- if (idScope.getCollectionScope() instanceof CollectionScope) {
- CollectionScope currentScope = idScope.getCollectionScope();
- //for each element in the history in the previous version,
- // copy it to the CF in v2
- Iterator<MvccEntity> allVersions = migration.from()
- .loadDescendingHistory(currentScope, idScope.getId(), now,
- 1000);
-
- while (allVersions.hasNext()) {
- final MvccEntity version = allVersions.next();
-
- final MutationBatch versionBatch =
- migration.to().write(currentScope, version);
-
- totalBatch.mergeShallow(versionBatch);
-
- if (atomicLong.incrementAndGet() % 50 == 0) {
+ return applicationEntityGroupObservable.flatMap(new Func1<ApplicationEntityGroup, Observable<Id>>() {
+ @Override
+ public Observable call(final ApplicationEntityGroup applicationEntityGroup) {
+ final List<EntityIdScope<CollectionScope>> entityIds = applicationEntityGroup.entityIds;
+
+ //go through each entity in the system, and load it's entire
+ // history
+ return Observable.from(entityIds)
+ .subscribeOn(Schedulers.io())
+ .map(new Func1<EntityIdScope<CollectionScope>, Id>() {
+ @Override
+ public Id call(EntityIdScope<CollectionScope> idScope) {
+
+ MigrationStrategy.MigrationRelationship<MvccEntitySerializationStrategy> migration = entityMigrationStrategy.getMigration();
+
+ if (idScope.getCollectionScope() instanceof CollectionScope) {
+ CollectionScope currentScope = idScope.getCollectionScope();
+ //for each element in the history in the previous version,
+ // copy it to the CF in v2
+ Iterator<MvccEntity> allVersions = migration.from()
+ .loadDescendingHistory(currentScope, idScope.getId(), now,
+ 1000);
+
+ while (allVersions.hasNext()) {
+ final MvccEntity version = allVersions.next();
+
+ final MutationBatch versionBatch =
+ migration.to().write(currentScope, version);
+
+ totalBatch.mergeShallow(versionBatch);
+
+ if (atomicLong.incrementAndGet() % 50 == 0) {
+ executeBatch(totalBatch, observer, atomicLong);
+ }
+ }
executeBatch(totalBatch, observer, atomicLong);
}
+
+ return idScope.getId();
}
- executeBatch(totalBatch, observer, atomicLong);
- }
-
- return idScope.getId();
- }
- })
- .buffer(100)
- .doOnNext(new Action1<List<Id>>() {
- @Override
- public void call(List<Id> ids) {
- executeBatch(totalBatch, observer, atomicLong);
-
- }
- });
+ })
+ .buffer(100)
+ .doOnNext(new Action1<List<Id>>() {
+ @Override
+ public void call(List<Id> ids) {
+ executeBatch(totalBatch, observer, atomicLong);
+
+ }
+ });
+ }
+ });
+
}
protected void executeBatch( final MutationBatch batch, final DataMigration.ProgressObserver po, final AtomicLong count ) {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/79ec9787/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java
index c119d84..b3388bc 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java
@@ -23,6 +23,7 @@ import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerialization
import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
import org.apache.usergrid.persistence.core.guice.*;
+import org.apache.usergrid.persistence.core.migration.data.CollectionDataMigration;
import org.apache.usergrid.persistence.core.migration.data.DataMigration;
import org.apache.usergrid.persistence.core.migration.schema.Migration;
@@ -50,13 +51,13 @@ public class SerializationModule extends AbstractModule {
bind( MvccEntitySerializationStrategy.class ).annotatedWith( V3Impl.class )
.to(MvccEntitySerializationStrategyV3Impl.class);
- bind(MvccEntitySerializationStrategy.class).annotatedWith( V1ProxyImpl.class )
+ bind(MvccEntitySerializationStrategy.class).annotatedWith(V1ProxyImpl.class)
.to(MvccEntitySerializationStrategyProxyV1Impl.class);
- bind(MvccEntitySerializationStrategy.class ).annotatedWith( ProxyImpl.class )
+ bind(MvccEntitySerializationStrategy.class ).annotatedWith(ProxyImpl.class)
.to(MvccEntitySerializationStrategyProxyV2Impl.class);
- Multibinder<DataMigration> dataMigrationMultibinder =
- Multibinder.newSetBinder( binder(), DataMigration.class );
+ Multibinder<CollectionDataMigration> dataMigrationMultibinder =
+ Multibinder.newSetBinder( binder(), CollectionDataMigration.class );
dataMigrationMultibinder.addBinding().to( MvccEntityDataMigrationImpl.class );
bind( MvccEntityMigrationStrategy.class ).to(MvccEntitySerializationStrategyProxyV2Impl.class);
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/79ec9787/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
index f2adee5..1e428e5 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
@@ -19,6 +19,7 @@
package org.apache.usergrid.persistence.core.guice;
+import org.apache.usergrid.persistence.core.migration.data.*;
import org.safehaus.guicyfig.GuicyFigModule;
import org.apache.usergrid.persistence.core.astyanax.AstyanaxKeyspaceProvider;
@@ -27,11 +28,6 @@ import org.apache.usergrid.persistence.core.astyanax.CassandraConfigImpl;
import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
import org.apache.usergrid.persistence.core.consistency.TimeService;
import org.apache.usergrid.persistence.core.consistency.TimeServiceImpl;
-import org.apache.usergrid.persistence.core.migration.data.DataMigration;
-import org.apache.usergrid.persistence.core.migration.data.DataMigrationManager;
-import org.apache.usergrid.persistence.core.migration.data.DataMigrationManagerImpl;
-import org.apache.usergrid.persistence.core.migration.data.MigrationInfoSerialization;
-import org.apache.usergrid.persistence.core.migration.data.MigrationInfoSerializationImpl;
import org.apache.usergrid.persistence.core.migration.schema.Migration;
import org.apache.usergrid.persistence.core.migration.schema.MigrationManager;
import org.apache.usergrid.persistence.core.migration.schema.MigrationManagerFig;
@@ -81,7 +77,8 @@ public class CommonModule extends AbstractModule {
//do multibindings for migrations
- Multibinder<DataMigration> dataMigrationMultibinder = Multibinder.newSetBinder( binder(), DataMigration.class );
+ Multibinder<ApplicationDataMigration> applicationDataMigrationMultibinder = Multibinder.newSetBinder( binder(), ApplicationDataMigration.class );
+ Multibinder<CollectionDataMigration> collectionDataMigrationMultibinder = Multibinder.newSetBinder( binder(), CollectionDataMigration.class );
// dataMigrationMultibinder.addBinding();
// dataMigrationManagerMultibinder.addBinding().to( DataMigrationManagerImpl.class );
// migrationBinding.addBinding().to( Key.get( MigrationInfoSerialization.class ) );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/79ec9787/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/ApplicationDataMigration.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/ApplicationDataMigration.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/ApplicationDataMigration.java
new file mode 100644
index 0000000..726ff3a
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/ApplicationDataMigration.java
@@ -0,0 +1,38 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. The ASF licenses this file to You
+ * * under the Apache License, Version 2.0 (the "License"); you may not
+ * * use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License. For additional information regarding
+ * * copyright in this work, please see the NOTICE file in the top level
+ * * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.persistence.core.migration.data;
+
+import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import rx.Observable;
+
+/**
+ * Migrate applications
+ */
+public interface ApplicationDataMigration extends DataMigration {
+
+ /**
+ * Migrate the data to the specified version
+ * @param observer
+ * @throws Throwable
+ */
+ public Observable migrate(final Observable<ApplicationScope> applicationEntityGroup, final ProgressObserver observer) throws Throwable;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/79ec9787/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/CollectionDataMigration.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/CollectionDataMigration.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/CollectionDataMigration.java
new file mode 100644
index 0000000..24dab0f
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/CollectionDataMigration.java
@@ -0,0 +1,36 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. The ASF licenses this file to You
+ * * under the Apache License, Version 2.0 (the "License"); you may not
+ * * use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License. For additional information regarding
+ * * copyright in this work, please see the NOTICE file in the top level
+ * * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.persistence.core.migration.data;
+
+import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
+import rx.Observable;
+
+/**
+ * Migrate Collections
+ */
+public interface CollectionDataMigration extends DataMigration {
+ /**
+ * Migrate the data to the specified version
+ * @param observer
+ * @throws Throwable
+ */
+ public Observable migrate(final Observable<ApplicationEntityGroup> applicationEntityGroup,final ProgressObserver observer) throws Throwable;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/79ec9787/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigration.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigration.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigration.java
index cae5623..495552e 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigration.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigration.java
@@ -43,12 +43,6 @@ import rx.Observable;
public interface DataMigration {
- /**
- * Migrate the data to the specified version
- * @param observer
- * @throws Throwable
- */
- public Observable migrate(final ApplicationEntityGroup applicationEntityGroup,final ProgressObserver observer) throws Throwable;
/**
* Get the version of this migration. It must be unique.
@@ -85,9 +79,9 @@ public interface DataMigration {
public enum MigrationType{
Entities,
- Edges,
- Index,
- Other
+ Applications,
+ System,
+ Collections
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/79ec9787/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImpl.java
index b8e4af9..58ba6ff 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImpl.java
@@ -21,29 +21,18 @@ package org.apache.usergrid.persistence.core.migration.data;
import java.io.PrintWriter;
import java.io.StringWriter;
-import java.util.ArrayList;
-import java.util.NavigableMap;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
+import java.util.*;
import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
import org.apache.usergrid.persistence.core.rx.ApplicationObservable;
import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
-import org.apache.usergrid.persistence.core.scope.EntityIdScope;
-import org.apache.usergrid.persistence.model.entity.Id;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.usergrid.persistence.core.migration.schema.MigrationException;
import com.google.common.base.Preconditions;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import rx.Observable;
@@ -63,20 +52,25 @@ public class DataMigrationManagerImpl implements DataMigrationManager {
private final AllEntitiesInSystemObservable allEntitiesInSystemObservable;
- private final Set<DataMigration> migrations;
private final ApplicationObservable applicationObservable;
+ private final Set<ApplicationDataMigration> appMigrations;
+ private final Set<CollectionDataMigration> collectionMigrations;
@Inject
public DataMigrationManagerImpl( final MigrationInfoSerialization migrationInfoSerialization,
- final Set<DataMigration> migrations,
+ final Set<ApplicationDataMigration> appMigrations,
+ final Set<CollectionDataMigration> collectionMigrations,
final AllEntitiesInSystemObservable allEntitiesInSystemObservable,
final ApplicationObservable applicationObservable
) {
+ this.appMigrations = appMigrations;
+ this.collectionMigrations = collectionMigrations;
Preconditions.checkNotNull( migrationInfoSerialization,
"migrationInfoSerialization must not be null" );
- Preconditions.checkNotNull( migrations, "migrations must not be null" );
+ Preconditions.checkNotNull( collectionMigrations, "migrations must not be null" );
+ Preconditions.checkNotNull( appMigrations, "migrations must not be null" );
Preconditions.checkNotNull( allEntitiesInSystemObservable, "allentitiesobservable must not be null" );
Preconditions.checkNotNull( applicationObservable, "applicationObservable must not be null" );
@@ -84,7 +78,6 @@ public class DataMigrationManagerImpl implements DataMigrationManager {
this.allEntitiesInSystemObservable = allEntitiesInSystemObservable;
this.applicationObservable = applicationObservable;
- this.migrations = migrations;
}
@@ -106,100 +99,160 @@ public class DataMigrationManagerImpl implements DataMigrationManager {
migrationTreeMap.lastKey() );
//we have our migrations to run, execute them
- final NavigableMap<Integer, DataMigration> migrationsToRun =
- migrationTreeMap.tailMap( currentVersion, false );
+ final Collection< DataMigration> migrationsToRun = migrationTreeMap.tailMap( currentVersion, false ).values();
+
+
final CassandraProgressObserver observer = new CassandraProgressObserver();
- Observable<DataMigration> migrations = Observable.from(migrationsToRun.values()).subscribeOn(Schedulers.io());
- final Observable appIdObservable = applicationObservable.getAllApplicationIds();
+ final Observable<ApplicationScope> appScopeObservable = applicationObservable.getAllApplicationScopes();
+
+ final Observable<ApplicationEntityGroup> entitiesObservable = appScopeObservable.flatMap(new Func1<ApplicationScope, Observable<ApplicationEntityGroup>>() {
+ @Override
+ public Observable<ApplicationEntityGroup> call(ApplicationScope applicationScope) {
+ return allEntitiesInSystemObservable.getAllEntitiesInSystem(appScopeObservable, 1000);
+ }
+ });
- Observable entityMigrations = migrations
+ final Observable<ApplicationDataMigration> applicationMigrationsRx = Observable.from(migrationsToRun)
.filter(new Func1<DataMigration, Boolean>() {
@Override
public Boolean call(DataMigration dataMigration) {
- return dataMigration.getType() == DataMigration.MigrationType.Entities;
+ return dataMigration instanceof ApplicationDataMigration;
}
- }).flatMap(new Func1<DataMigration, Observable<ApplicationEntityGroup>>() {
+ }).map(new Func1<DataMigration, ApplicationDataMigration>() {
@Override
- public Observable<ApplicationEntityGroup> call(final DataMigration dataMigration) {
- return allEntitiesInSystemObservable
- .getAllEntitiesInSystem(appIdObservable, 1000)
- .doOnNext(new Action1<ApplicationEntityGroup>() {
- @Override
- public void call(ApplicationEntityGroup entityGroup) {
- runMigration(dataMigration,observer,entityGroup);
- }
- });
+ public ApplicationDataMigration call(DataMigration dataMigration) {
+ return (ApplicationDataMigration)dataMigration;
}
});
- //migrations that aren't entities
- Observable otherMigrations = migrations
+ final Observable<CollectionDataMigration> collectionMigrationsRx = Observable.from(migrationsToRun)
.filter(new Func1<DataMigration, Boolean>() {
@Override
public Boolean call(DataMigration dataMigration) {
- return dataMigration.getType() != DataMigration.MigrationType.Entities;
+ return dataMigration instanceof CollectionDataMigration;
}
- }).flatMap(new Func1<DataMigration, Observable<?>>() {
+ }).map(new Func1<DataMigration, CollectionDataMigration>() {
@Override
- public Observable call(final DataMigration dataMigration) {
- return appIdObservable.doOnNext(new Action1<Id>() {
+ public CollectionDataMigration call(DataMigration dataMigration) {
+ return (CollectionDataMigration) dataMigration;
+ }
+ });
+
+ Observable applications = applicationMigrationsRx
+ .doOnNext(new Action1<ApplicationDataMigration>() {
@Override
- public void call(Id id) {
- ApplicationScope scope = new ApplicationScopeImpl(id);
- runMigration(dataMigration, observer, new ApplicationEntityGroup(scope, null));
+ public void call(ApplicationDataMigration dataMigration) {
+
+ migrationInfoSerialization.setStatusCode(StatusCode.RUNNING.status);
+
+ final int migrationVersion = dataMigration.getVersion();
+
+ LOG.info("Running migration version {}", migrationVersion);
+
+ observer.update(migrationVersion, "Starting migration");
+
+ //perform this migration, if it fails, short circuit
+ try {
+ dataMigration.migrate(appScopeObservable, observer).toBlocking().lastOrDefault(null);
+ } catch (Throwable throwable) {
+ observer.failed(migrationVersion, "Exception thrown during migration", throwable);
+ LOG.error("Unable to migrate to version {}.", migrationVersion, throwable);
+ throw new RuntimeException(throwable);
+ }
+
+ //we had an unhandled exception or the migration failed, short circuit
+ if (observer.failed) {
+ return;
+ }
+
+ //set the version
+ migrationInfoSerialization.setVersion(migrationVersion);
+
+ //update the observer for progress so other nodes can see it
+ observer.update(migrationVersion, "Completed successfully");
+
+ }
+ });
+
+
+ Observable entities =collectionMigrationsRx
+ .doOnNext(new Action1<CollectionDataMigration>() {
+ @Override
+ public void call(CollectionDataMigration dataMigration) {
+ migrationInfoSerialization.setStatusCode(StatusCode.RUNNING.status);
+
+ final int migrationVersion = dataMigration.getVersion();
+
+ LOG.info("Running migration version {}", migrationVersion);
+
+ observer.update(migrationVersion, "Starting migration");
+
+ //perform this migration, if it fails, short circuit
+ try {
+ dataMigration.migrate(entitiesObservable, observer).toBlocking().lastOrDefault(null);
+ } catch (Throwable throwable) {
+ observer.failed(migrationVersion, "Exception thrown during migration", throwable);
+ LOG.error("Unable to migrate to version {}.", migrationVersion, throwable);
+ throw new RuntimeException(throwable);
+ }
+
+ //we had an unhandled exception or the migration failed, short circuit
+ if (observer.failed) {
+ return;
+ }
+
+ //set the version
+ migrationInfoSerialization.setVersion(migrationVersion);
+
+ //update the observer for progress so other nodes can see it
+ observer.update(migrationVersion, "Completed successfully");
}
});
- }
- });
try {
Observable
- .merge(entityMigrations, otherMigrations)
+ .merge(applications, entities)
.subscribeOn(Schedulers.io())
.toBlocking().lastOrDefault(null);
migrationInfoSerialization.setStatusCode( StatusCode.COMPLETE.status );
} catch (Exception e){
- LOG.error("Migration Failed");
+ LOG.error("Migration Failed",e);
}
}
- private void runMigration(DataMigration migration, CassandraProgressObserver observer, ApplicationEntityGroup applicationEntityGroup) {
- migrationInfoSerialization.setStatusCode(StatusCode.RUNNING.status);
- final int migrationVersion = migration.getVersion();
- LOG.info("Running migration version {}", migrationVersion);
+ private boolean populateTreeMap() {
+ if ( migrationTreeMap.isEmpty() ) {
+ for ( DataMigration migration : appMigrations ) {
- observer.update(migrationVersion, "Starting migration");
+ Preconditions.checkNotNull(migration,
+ "A migration instance in the set of migrations was null. This is not allowed");
- //perform this migration, if it fails, short circuit
- try {
- migration.migrate(applicationEntityGroup, observer).toBlocking().lastOrDefault(null);
- } catch (Throwable throwable) {
- observer.failed(migrationVersion, "Exception thrown during migration", throwable);
- LOG.error("Unable to migrate to version {}.", migrationVersion, throwable);
- throw new RuntimeException(throwable) ;
- }
+ final int version = migration.getVersion();
- //we had an unhandled exception or the migration failed, short circuit
- if (observer.failed) {
- return ;
- }
+ final DataMigration existing = migrationTreeMap.get( version );
+
+ if ( existing != null ) {
- //set the version
- migrationInfoSerialization.setVersion(migrationVersion);
+ final Class<? extends DataMigration> existingClass = existing.getClass();
- //update the observer for progress so other nodes can see it
- observer.update(migrationVersion, "Completed successfully");
- }
+ final Class<? extends DataMigration> currentClass = migration.getClass();
- private boolean populateTreeMap() {
- if ( migrationTreeMap.isEmpty() ) {
- for ( DataMigration migration : migrations ) {
+
+ throw new DataMigrationException( String.format(
+ "Data migrations must be unique. Both classes %s and %s have version %d",
+ existingClass, currentClass, version ) );
+ }
+
+ migrationTreeMap.put( version, migration );
+ }
+
+ for ( DataMigration migration : collectionMigrations ) {
Preconditions.checkNotNull(migration,
"A migration instance in the set of migrations was null. This is not allowed");
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/79ec9787/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/AllEntitiesInSystemObservable.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/AllEntitiesInSystemObservable.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/AllEntitiesInSystemObservable.java
index 141b6c1..e029f92 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/AllEntitiesInSystemObservable.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/AllEntitiesInSystemObservable.java
@@ -43,10 +43,10 @@ public interface AllEntitiesInSystemObservable<T extends ApplicationScope> {
* Return an observable that emits all entities in the system.
*
* @param appIdObservable list of app ids
- * @param bufferSize The amount of entityIds to buffer into each ApplicationEntityGroup. Note that if we exceed the buffer size
- * you may be more than 1 ApplicationEntityGroup with the same application and different ids
+ * @param bufferSize The amount of entityIds to buffer into each ApplicationEntityGroup. Note that if we exceed the buffer size
+ * you may be more than 1 ApplicationEntityGroup with the same application and different ids
*/
- public Observable<ApplicationEntityGroup<T>> getAllEntitiesInSystem(Observable<Id> appIdObservable, final int bufferSize);
+ public Observable<ApplicationEntityGroup<T>> getAllEntitiesInSystem(Observable<ApplicationScope> appIdObservable, final int bufferSize);
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/79ec9787/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ApplicationObservable.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ApplicationObservable.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ApplicationObservable.java
index 09e888d..a229eb6 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ApplicationObservable.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ApplicationObservable.java
@@ -19,6 +19,7 @@
*/
package org.apache.usergrid.persistence.core.rx;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.model.entity.Id;
import rx.Observable;
@@ -30,4 +31,9 @@ public interface ApplicationObservable {
* Get all applicationIds as an observable
*/
Observable<Id> getAllApplicationIds();
+
+ /**
+ * get all application scopes
+ */
+ Observable<ApplicationScope> getAllApplicationScopes();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/79ec9787/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/MaxMigrationModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/MaxMigrationModule.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/MaxMigrationModule.java
index 2d91492..4e9ef90 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/MaxMigrationModule.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/MaxMigrationModule.java
@@ -20,6 +20,7 @@
package org.apache.usergrid.persistence.core.guice;
+import org.apache.usergrid.persistence.core.migration.data.ApplicationDataMigration;
import org.apache.usergrid.persistence.core.migration.data.DataMigration;
import com.google.inject.AbstractModule;
@@ -33,7 +34,7 @@ import com.google.inject.multibindings.Multibinder;
public class MaxMigrationModule extends AbstractModule {
@Override
protected void configure() {
- Multibinder<DataMigration> dataMigrationMultibinder = Multibinder.newSetBinder( binder(), DataMigration.class );
+ Multibinder<ApplicationDataMigration> dataMigrationMultibinder = Multibinder.newSetBinder( binder(), ApplicationDataMigration.class );
dataMigrationMultibinder.addBinding().to( MaxMigrationVersion.class );
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/79ec9787/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/MaxMigrationVersion.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/MaxMigrationVersion.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/MaxMigrationVersion.java
index d903a7d..d3b9b49 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/MaxMigrationVersion.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/MaxMigrationVersion.java
@@ -20,23 +20,24 @@
package org.apache.usergrid.persistence.core.guice;
+import org.apache.usergrid.persistence.core.migration.data.ApplicationDataMigration;
import org.apache.usergrid.persistence.core.migration.data.DataMigration;
import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import rx.Observable;
/**
* A simple migration that sets the version to max. This way our integration tests always test the latest code
*/
-public class MaxMigrationVersion implements DataMigration {
+public class MaxMigrationVersion implements ApplicationDataMigration {
@Override
- public Observable migrate(final ApplicationEntityGroup applicationEntityGroup, final ProgressObserver observer ) throws Throwable {
+ public Observable migrate(final Observable<ApplicationScope> applicationEntityGroup, final ProgressObserver observer) {
//no op, just needs to run to be set
return Observable.empty();
}
-
@Override
public int getVersion() {
return Integer.MAX_VALUE;
@@ -44,6 +45,6 @@ public class MaxMigrationVersion implements DataMigration {
@Override
public MigrationType getType() {
- return MigrationType.Other;
+ return MigrationType.Applications;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/79ec9787/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImplTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImplTest.java
index 0c7e76b..16092b5 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImplTest.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImplTest.java
@@ -28,6 +28,7 @@ import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
import org.apache.usergrid.persistence.core.rx.ApplicationObservable;
import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
import org.apache.usergrid.persistence.core.scope.EntityIdScope;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.entity.SimpleId;
@@ -75,6 +76,11 @@ public class DataMigrationManagerImplTest {
public Observable<Id> getAllApplicationIds() {
return Observable.just( (Id)new SimpleId("application"));
}
+
+ @Override
+ public Observable<ApplicationScope> getAllApplicationScopes() {
+ return Observable.just( (ApplicationScope)new ApplicationScopeImpl((Id)new SimpleId("application")));
+ }
};
@Test
@@ -82,9 +88,10 @@ public class DataMigrationManagerImplTest {
final MigrationInfoSerialization serialization = mock( MigrationInfoSerialization.class );
when(serialization.getCurrentVersion()).thenReturn(1);
- Set<DataMigration> emptyMigration = new HashSet<>();
+ Set<CollectionDataMigration> collectionDataMigrations = new HashSet<>();
+ Set<ApplicationDataMigration> emptyMigration = new HashSet<>();
- DataMigrationManagerImpl migrationManager = new DataMigrationManagerImpl( serialization, emptyMigration, allEntitiesInSystemObservable,allApplicationsObservable );
+ DataMigrationManagerImpl migrationManager = new DataMigrationManagerImpl( serialization, emptyMigration,collectionDataMigrations, allEntitiesInSystemObservable,allApplicationsObservable );
migrationManager.migrate();
@@ -100,27 +107,29 @@ public class DataMigrationManagerImplTest {
when(serialization.getCurrentVersion()).thenReturn(1);
- final DataMigration v1 = mock( DataMigration.class );
+ final ApplicationDataMigration v1 = mock( ApplicationDataMigration.class );
when( v1.getVersion() ).thenReturn( 2 );
- when( v1.migrate(any(ApplicationEntityGroup.class), any(DataMigration.ProgressObserver.class))).thenReturn(Observable.empty());
+ when( v1.migrate(any(Observable.class), any(DataMigration.ProgressObserver.class))).thenReturn(Observable.empty());
- final DataMigration v2 = mock( DataMigration.class );
+ final ApplicationDataMigration v2 = mock( ApplicationDataMigration.class );
when( v2.getVersion() ).thenReturn( 3 );
- when(v2.migrate(any(ApplicationEntityGroup.class), any(DataMigration.ProgressObserver.class))).thenReturn(Observable.empty());
+ when(v2.migrate(any(Observable.class), any(DataMigration.ProgressObserver.class))).thenReturn(Observable.empty());
- Set<DataMigration> migrations = new HashSet<>();
+ Set<ApplicationDataMigration> migrations = new HashSet<>();
migrations.add( v1 );
migrations.add( v2 );
+ Set<CollectionDataMigration> collectionDataMigrations = new HashSet<>();
- DataMigrationManagerImpl migrationManager = new DataMigrationManagerImpl( serialization, migrations,allEntitiesInSystemObservable,allApplicationsObservable );
+
+ DataMigrationManagerImpl migrationManager = new DataMigrationManagerImpl( serialization, migrations,collectionDataMigrations,allEntitiesInSystemObservable,allApplicationsObservable );
migrationManager.migrate();
- verify( v1 ).migrate(any(ApplicationEntityGroup.class), any( DataMigration.ProgressObserver.class ) );
- verify( v2 ).migrate(any(ApplicationEntityGroup.class), any( DataMigration.ProgressObserver.class ) );
+ verify( v1 ).migrate(any(Observable.class), any( DataMigration.ProgressObserver.class ) );
+ verify( v2 ).migrate(any(Observable.class), any( DataMigration.ProgressObserver.class ) );
//verify we set the running status
verify( serialization, times( 2 ) ).setStatusCode( DataMigrationManagerImpl.StatusCode.RUNNING.status );
@@ -144,33 +153,35 @@ public class DataMigrationManagerImplTest {
when(serialization.getCurrentVersion()).thenReturn(1);
- final DataMigration v1 = mock( DataMigration.class,"mock1" );
+ final ApplicationDataMigration v1 = mock( ApplicationDataMigration.class,"mock1" );
when( v1.getVersion() ).thenReturn( 2 );
when( v1.getType() ).thenReturn(DataMigration.MigrationType.Entities);
- when( v1.migrate(any(ApplicationEntityGroup.class), any(DataMigration.ProgressObserver.class))).thenReturn(Observable.empty());
+ when( v1.migrate(any(Observable.class), any(DataMigration.ProgressObserver.class))).thenReturn(Observable.empty());
//throw an exception
- when( v1.migrate(any(ApplicationEntityGroup.class),
+ when( v1.migrate(any(Observable.class),
any(DataMigration.ProgressObserver.class) )).thenThrow(new RuntimeException( "Something bad happened" ));
- final DataMigration v2 = mock( DataMigration.class,"mock2" );
+ final ApplicationDataMigration v2 = mock( ApplicationDataMigration.class,"mock2" );
when( v2.getType() ).thenReturn(DataMigration.MigrationType.Entities);
when( v2.getVersion() ).thenReturn( 3 );
- Set<DataMigration> migrations = new HashSet<>();
+ Set<ApplicationDataMigration> migrations = new HashSet<>();
migrations.add( v1 );
migrations.add( v2 );
+ Set<CollectionDataMigration> collectionDataMigrations = new HashSet<>();
- DataMigrationManagerImpl migrationManager = new DataMigrationManagerImpl( serialization, migrations,allEntitiesInSystemObservable,allApplicationsObservable );
+ DataMigrationManagerImpl migrationManager
+ = new DataMigrationManagerImpl( serialization, migrations,collectionDataMigrations,allEntitiesInSystemObservable,allApplicationsObservable );
migrationManager.migrate();
- verify( v1 ).migrate( any(ApplicationEntityGroup.class),any( DataMigration.ProgressObserver.class ) );
+ verify( v1 ).migrate( any(Observable.class),any( DataMigration.ProgressObserver.class ) );
//verify we don't run migration
- verify( v2, never() ).migrate( any(ApplicationEntityGroup.class),any( DataMigration.ProgressObserver.class ) );
+ verify( v2, never() ).migrate( any(Observable.class),any( DataMigration.ProgressObserver.class ) );
//verify we set the running status
verify( serialization, times( 1 ) ).setStatusCode( DataMigrationManagerImpl.StatusCode.RUNNING.status );
@@ -194,17 +205,17 @@ public class DataMigrationManagerImplTest {
final MigrationInfoSerialization serialization = mock(MigrationInfoSerialization.class);
when(serialization.getCurrentVersion()).thenReturn(1);
- final DataMigration v1 = mock( DataMigration.class );
+ final CollectionDataMigration v1 = mock( CollectionDataMigration.class );
when( v1.getVersion() ).thenReturn( 2 );
when( v1.getType() ).thenReturn(DataMigration.MigrationType.Entities);
- when( v1.migrate(any(ApplicationEntityGroup.class), any(DataMigration.ProgressObserver.class))).thenReturn(Observable.empty());
+ when( v1.migrate(any(Observable.class), any(DataMigration.ProgressObserver.class))).thenReturn(Observable.empty());
final int returnedCode = 100;
final String reason = "test reason";
//mark as fail but don't
- when(v1.migrate(any(ApplicationEntityGroup.class), any(DataMigration.ProgressObserver.class))).thenAnswer(
+ when(v1.migrate(any(Observable.class), any(DataMigration.ProgressObserver.class))).thenAnswer(
new Answer<Object>() {
@Override
public Object answer(final InvocationOnMock invocation) throws Throwable {
@@ -218,25 +229,26 @@ public class DataMigrationManagerImplTest {
);
- final DataMigration v2 = mock( DataMigration.class );
+ final CollectionDataMigration v2 = mock( CollectionDataMigration.class );
when( v2.getVersion() ).thenReturn( 3 );
when( v2.getType() ).thenReturn(DataMigration.MigrationType.Entities);
- when(v2.migrate(any(ApplicationEntityGroup.class), any(DataMigration.ProgressObserver.class))).thenReturn(Observable.empty());
+ when(v2.migrate(any(Observable.class), any(DataMigration.ProgressObserver.class))).thenReturn(Observable.empty());
- Set<DataMigration> migrations = new HashSet<>();
- migrations.add( v1 );
- migrations.add( v2 );
+ Set<CollectionDataMigration> collectionMigrations = new HashSet<>();
+ collectionMigrations.add( v1 );
+ collectionMigrations.add(v2);
+ Set<ApplicationDataMigration> applicationDataMigrations = new HashSet<>();
- DataMigrationManagerImpl migrationManager = new DataMigrationManagerImpl( serialization, migrations,allEntitiesInSystemObservable, allApplicationsObservable );
+ DataMigrationManagerImpl migrationManager = new DataMigrationManagerImpl( serialization, applicationDataMigrations,collectionMigrations,allEntitiesInSystemObservable, allApplicationsObservable );
migrationManager.migrate();
- verify( v1 ).migrate(any(ApplicationEntityGroup.class), any( DataMigration.ProgressObserver.class ) );
+ verify( v1 ).migrate(any(Observable.class), any( DataMigration.ProgressObserver.class ) );
//verify we don't run migration
- verify( v2, never() ).migrate( any(ApplicationEntityGroup.class),any( DataMigration.ProgressObserver.class ) );
+ verify( v2, never() ).migrate( any(Observable.class),any( DataMigration.ProgressObserver.class ) );
//verify we set the running status
verify( serialization, times( 1 ) ).setStatusCode( DataMigrationManagerImpl.StatusCode.RUNNING.status );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/79ec9787/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/ApplicationsTestObservable.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/ApplicationsTestObservable.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/ApplicationsTestObservable.java
index c0d52dd..b798c41 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/ApplicationsTestObservable.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/ApplicationsTestObservable.java
@@ -19,6 +19,7 @@
*/
package org.apache.usergrid.persistence.core.rx;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.model.entity.Id;
import rx.Observable;
@@ -30,4 +31,9 @@ public class ApplicationsTestObservable implements ApplicationObservable {
public Observable<Id> getAllApplicationIds() {
return Observable.empty();
}
+
+ @Override
+ public Observable<ApplicationScope> getAllApplicationScopes() {
+ return Observable.empty();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/79ec9787/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
index 5e4a300..b695512 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
@@ -21,6 +21,7 @@ package org.apache.usergrid.persistence.graph.guice;
import org.apache.usergrid.persistence.core.guice.V1Impl;
import org.apache.usergrid.persistence.core.guice.V2Impl;
+import org.apache.usergrid.persistence.core.migration.data.ApplicationDataMigration;
import org.apache.usergrid.persistence.core.migration.data.DataMigration;
import org.apache.usergrid.persistence.graph.serialization.*;
import org.apache.usergrid.persistence.graph.serialization.impl.*;
@@ -116,8 +117,8 @@ public class GraphModule extends AbstractModule {
bind( EdgeMetaRepair.class ).to( EdgeMetaRepairImpl.class );
bind( EdgeDeleteRepair.class ).to( EdgeDeleteRepairImpl.class );
- Multibinder<DataMigration> dataMigrationMultibinder =
- Multibinder.newSetBinder( binder(), DataMigration.class );
+ Multibinder<ApplicationDataMigration> dataMigrationMultibinder =
+ Multibinder.newSetBinder( binder(), ApplicationDataMigration.class );
dataMigrationMultibinder.addBinding().to( EdgeDataMigrationImpl.class );
/**
@@ -130,11 +131,11 @@ public class GraphModule extends AbstractModule {
bind( EdgeShardStrategy.class ).to( SizebasedEdgeShardStrategy.class );
- bind( ShardedEdgeSerialization.class ).to( ShardedEdgeSerializationImpl.class );
+ bind(ShardedEdgeSerialization.class ).to( ShardedEdgeSerializationImpl.class );
bind( EdgeColumnFamilies.class ).to( SizebasedEdgeColumnFamilies.class );
- bind( ShardGroupCompaction.class ).to( ShardGroupCompactionImpl.class );
+ bind(ShardGroupCompaction.class ).to( ShardGroupCompactionImpl.class );
/**
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/79ec9787/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeDataMigrationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeDataMigrationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeDataMigrationImpl.java
index 975e1a2..7d4fb2e 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeDataMigrationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeDataMigrationImpl.java
@@ -23,9 +23,10 @@ import com.google.inject.Inject;
import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+import org.apache.usergrid.persistence.core.migration.data.ApplicationDataMigration;
import org.apache.usergrid.persistence.core.migration.data.DataMigration;
import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
-import org.apache.usergrid.persistence.core.scope.EntityIdScope;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.graph.GraphManager;
import org.apache.usergrid.persistence.graph.GraphManagerFactory;
@@ -44,7 +45,7 @@ import java.util.concurrent.atomic.AtomicLong;
* Encapsulates data mi
*/
-public class EdgeDataMigrationImpl implements DataMigration {
+public class EdgeDataMigrationImpl implements ApplicationDataMigration {
private static final Logger logger = LoggerFactory.getLogger(EdgeDataMigrationImpl.class);
@@ -65,73 +66,62 @@ public class EdgeDataMigrationImpl implements DataMigration {
this.edgesFromSourceObservable = edgesFromSourceObservable;
this.edgeMigrationStrategy = edgeMigrationStrategy;
}
- @Override
- public Observable<Long> migrate(final ApplicationEntityGroup applicationEntityGroup,
- final DataMigration.ProgressObserver observer) {
- final GraphManager gm = graphManagerFactory.createEdgeManager(applicationEntityGroup.applicationScope);
- final Observable<Edge> edgesFromSource = edgesFromSourceObservable.edgesFromSource(gm, applicationEntityGroup.applicationScope.getApplication());
+
+ @Override
+ public Observable migrate(final Observable<ApplicationScope> scopes,
+ final DataMigration.ProgressObserver observer) {
final AtomicLong counter = new AtomicLong();
- rx.Observable o =
- Observable
- .from(applicationEntityGroup.entityIds)
- .flatMap(new Func1<EntityIdScope, Observable<List<Edge>>>() {
- //for each id in the group, get it's edges
- @Override
- public Observable<List<Edge>> call(final EntityIdScope idScope) {
- logger.info("Migrating edges from node {} in scope {}", idScope.getId(),
- applicationEntityGroup.applicationScope);
-
- //get each edge from this node as a source
- return edgesFromSource
-
- //for each edge, re-index it in v2 every 1000 edges or less
- .buffer(1000)
- .doOnNext(new Action1<List<Edge>>() {
- @Override
- public void call(List<Edge> edges) {
- final MutationBatch batch =
- keyspace.prepareMutationBatch();
-
- for (Edge edge : edges) {
- logger.info("Migrating meta for edge {}", edge);
- final MutationBatch edgeBatch = edgeMigrationStrategy.getMigration().to()
- .writeEdge(
- applicationEntityGroup
- .applicationScope,
- edge);
- batch.mergeShallow(edgeBatch);
- }
-
- try {
- batch.execute();
- } catch (ConnectionException e) {
- throw new RuntimeException(
- "Unable to perform migration", e);
- }
-
- //update the observer so the admin can see it
- final long newCount =
- counter.addAndGet(edges.size());
-
- observer.update(getVersion(), String.format(
- "Currently running. Rewritten %d edge types",
- newCount));
- }
- });
- }
-
-
- })
- .map(new Func1<List<Edge>, Long>() {
- @Override
- public Long call(List<Edge> edges) {
- return counter.get();
- }
- });
- return o;
+
+ return scopes.flatMap(new Func1<ApplicationScope, Observable<?>>() {
+ @Override
+ public Observable call(final ApplicationScope applicationScope) {
+ final GraphManager gm = graphManagerFactory.createEdgeManager(applicationScope);
+ final Observable<Edge> edgesFromSource = edgesFromSourceObservable.edgesFromSource(gm, applicationScope.getApplication());
+ logger.info("Migrating edges scope {}", applicationScope);
+
+ //get each edge from this node as a source
+ return edgesFromSource
+
+ //for each edge, re-index it in v2 every 1000 edges or less
+ .buffer(1000)
+ .doOnNext(new Action1<List<Edge>>() {
+ @Override
+ public void call(List<Edge> edges) {
+ final MutationBatch batch =
+ keyspace.prepareMutationBatch();
+
+ for (Edge edge : edges) {
+ logger.info("Migrating meta for edge {}", edge);
+ final MutationBatch edgeBatch = edgeMigrationStrategy.getMigration().to()
+ .writeEdge(applicationScope,
+ edge);
+ batch.mergeShallow(edgeBatch);
+ }
+
+ try {
+ batch.execute();
+ } catch (ConnectionException e) {
+ throw new RuntimeException(
+ "Unable to perform migration", e);
+ }
+
+ //update the observer so the admin can see it
+ final long newCount =
+ counter.addAndGet(edges.size());
+
+ observer.update(getVersion(), String.format(
+ "Currently running. Rewritten %d edge types",
+ newCount));
+ }
+ });
+ }
+ });
+
}
+
+
@Override
public int getVersion() {
return edgeMigrationStrategy.getVersion();
@@ -139,6 +129,6 @@ public class EdgeDataMigrationImpl implements DataMigration {
@Override
public MigrationType getType() {
- return MigrationType.Edges;
+ return MigrationType.Applications;
}
}