You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by gr...@apache.org on 2015/03/19 23:19:55 UTC
[05/50] [abbrv] incubator-usergrid git commit: Updated entity
migration to run in parallel
Updated entity migration to run in parallel
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/95897754
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/95897754
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/95897754
Branch: refs/heads/USERGRID-493
Commit: 95897754a083debeae5fd1733836f7905baec76f
Parents: 787bc09
Author: Todd Nine <tn...@apigee.com>
Authored: Thu Feb 26 16:01:59 2015 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu Feb 26 16:01:59 2015 -0700
----------------------------------------------------------------------
.../impl/MvccEntityDataMigrationImpl.java | 150 +++++++++++--------
.../impl/EntityVersionCreatedTaskTest.java | 2 -
.../core/migration/data/DataMigration.java | 3 +-
.../core/scope/ApplicationEntityGroup.java | 4 +-
4 files changed, 95 insertions(+), 64 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/95897754/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntityDataMigrationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntityDataMigrationImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntityDataMigrationImpl.java
index 4c46769..4dfc64b 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntityDataMigrationImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntityDataMigrationImpl.java
@@ -19,10 +19,12 @@
*/
package org.apache.usergrid.persistence.collection.serialization.impl;
-import com.google.inject.Inject;
-import com.netflix.astyanax.Keyspace;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+
import org.apache.usergrid.persistence.collection.CollectionScope;
import org.apache.usergrid.persistence.collection.EntitySet;
import org.apache.usergrid.persistence.collection.MvccEntity;
@@ -33,20 +35,20 @@ import org.apache.usergrid.persistence.core.migration.data.DataMigration;
import org.apache.usergrid.persistence.core.migration.data.DataMigrationException;
import org.apache.usergrid.persistence.core.migration.schema.MigrationStrategy;
import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.scope.EntityIdScope;
-import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import com.google.inject.Inject;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func1;
+import rx.functions.Func2;
import rx.schedulers.Schedulers;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicLong;
/**
* Data migration strategy for entities
@@ -56,74 +58,102 @@ public class MvccEntityDataMigrationImpl implements CollectionDataMigration {
private final Keyspace keyspace;
private final MvccEntityMigrationStrategy entityMigrationStrategy;
+
@Inject
- public MvccEntityDataMigrationImpl(Keyspace keyspace, MvccEntityMigrationStrategy serializationStrategy){
+ public MvccEntityDataMigrationImpl( Keyspace keyspace, MvccEntityMigrationStrategy serializationStrategy ) {
this.keyspace = keyspace;
this.entityMigrationStrategy = serializationStrategy;
}
+
@Override
- public Observable migrate(final Observable<ApplicationEntityGroup> applicationEntityGroupObservable, final DataMigration.ProgressObserver observer) {
+ public Observable migrate( final Observable<ApplicationEntityGroup> applicationEntityGroupObservable,
+ final DataMigration.ProgressObserver observer ) {
final AtomicLong atomicLong = new AtomicLong();
- final MutationBatch totalBatch = keyspace.prepareMutationBatch();
+ //capture the time the test starts
final UUID now = UUIDGenerator.newTimeUUID();
- return applicationEntityGroupObservable.flatMap(new Func1<ApplicationEntityGroup, Observable<Id>>() {
+ return applicationEntityGroupObservable.flatMap( new Func1<ApplicationEntityGroup, Observable<Long>>() {
@Override
- public Observable call(final ApplicationEntityGroup applicationEntityGroup) {
+ public Observable<Long> call( final ApplicationEntityGroup applicationEntityGroup ) {
final List<EntityIdScope<CollectionScope>> entityIds = applicationEntityGroup.entityIds;
//go through each entity in the system, and load it's entire
// history
- return Observable.from(entityIds)
- .subscribeOn(Schedulers.io())
- .map(new Func1<EntityIdScope<CollectionScope>, Id>() {
- @Override
- public Id call(EntityIdScope<CollectionScope> idScope) {
-
- MigrationStrategy.MigrationRelationship<MvccEntitySerializationStrategy> migration = entityMigrationStrategy.getMigration();
-
-
- CollectionScope currentScope = idScope.getCollectionScope();
- //for each element in the history in the previous version,
- // copy it to the CF in v2
- EntitySet allVersions = migration.from()
- .load( currentScope, Collections.singleton( idScope.getId() ), now );
-
- final MvccEntity version = allVersions.getEntity( idScope.getId() );
-
- final MutationBatch versionBatch =
- migration.to().write(currentScope, version);
-
- totalBatch.mergeShallow(versionBatch);
-
- throw new UnsupportedOperationException( "TODO, make this more functional in flushing" );
-
-// if (atomicLong.incrementAndGet() % 50 == 0) {
-// executeBatch(totalBatch, observer, atomicLong);
-// }
-
-// executeBatch(totalBatch, observer, atomicLong);
-
-// return idScope.getId();
- }
-
-
- } ).buffer(100).doOnNext( new Action1<List<Id>>() {
- @Override
- public void call( List<Id> ids ) {
- executeBatch( totalBatch, observer, atomicLong );
- }
- });
+ return Observable.from( entityIds ).subscribeOn( Schedulers.io() )
+ .parallel( new Func1<Observable<EntityIdScope<CollectionScope>>, Observable<Long>>() {
+
+
+ //process the ids in parallel
+ @Override
+ public Observable<Long> call(
+ final Observable<EntityIdScope<CollectionScope>> entityIdScopeObservable
+ ) {
+
+ final MutationBatch totalBatch = keyspace.prepareMutationBatch();
+
+ return entityIdScopeObservable.doOnNext(
+ new Action1<EntityIdScope<CollectionScope>>() {
+
+ //load the entity and add it to the toal mutation
+ @Override
+ public void call( final EntityIdScope<CollectionScope> idScope ) {
+
+ //load the entity
+ MigrationStrategy
+ .MigrationRelationship<MvccEntitySerializationStrategy>
+ migration = entityMigrationStrategy.getMigration();
+
+
+ CollectionScope currentScope = idScope.getCollectionScope();
+ //for each element in the history in the previous
+ // version,
+ // copy it to the CF in v2
+ EntitySet allVersions = migration.from().load( currentScope,
+ Collections.singleton( idScope.getId() ), now );
+
+ final MvccEntity version =
+ allVersions.getEntity( idScope.getId() );
+
+ final MutationBatch versionBatch =
+ migration.to().write( currentScope, version );
+
+ totalBatch.mergeShallow( versionBatch );
+ }
+ } )
+ //every 100 flush the mutation
+ .buffer( 100 ).doOnNext(
+ new Action1<List<EntityIdScope<CollectionScope>>>() {
+ @Override
+ public void call(
+ final List<EntityIdScope<CollectionScope>> ids ) {
+ atomicLong.addAndGet( 100 );
+ executeBatch( totalBatch, observer, atomicLong );
+ }
+ } )
+ //count the results
+ .reduce( 0l,
+ new Func2<Long, List<EntityIdScope<CollectionScope>>, Long>() {
+ @Override
+ public Long call( final Long aLong,
+ final
+ List<EntityIdScope<CollectionScope>>
+ ids ) {
+ return aLong + ids.size();
+ }
+ } );
+ }
+ } );
}
- });
-
+ } );
}
- protected void executeBatch( final MutationBatch batch, final DataMigration.ProgressObserver po, final AtomicLong count ) {
+
+ protected void executeBatch( final MutationBatch batch, final DataMigration.ProgressObserver po,
+ final AtomicLong count ) {
try {
batch.execute();
@@ -135,11 +165,13 @@ public class MvccEntityDataMigrationImpl implements CollectionDataMigration {
}
}
+
@Override
public int getVersion() {
return entityMigrationStrategy.getVersion();
}
+
@Override
public MigrationType getType() {
return MigrationType.Entities;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/95897754/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTaskTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTaskTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTaskTest.java
index 90055a4..ade58f6 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTaskTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTaskTest.java
@@ -59,8 +59,6 @@ public class EntityVersionCreatedTaskTest {
// create a latch for the event listener, and add it to the list of events
- final int sizeToReturn = 0;
-
final Set<EntityVersionCreated> listeners = mock( Set.class );
when ( listeners.size()).thenReturn( 0 );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/95897754/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigration.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigration.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigration.java
index 5b04466..e1e3092 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigration.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigration.java
@@ -46,9 +46,10 @@ public interface DataMigration <T> {
/**
* Migrate the data to the specified version
* @param observer
+ * @Return an observable containing the count of the number of elements migrated
* @throws Throwable
*/
- public Observable migrate(final Observable<T> applicationEntityGroup,final ProgressObserver observer) throws Throwable;
+ public Observable<Long> migrate(final Observable<T> applicationEntityGroup,final ProgressObserver observer) throws Throwable;
/**
* Get the version of this migration. It must be unique.
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/95897754/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/ApplicationEntityGroup.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/ApplicationEntityGroup.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/ApplicationEntityGroup.java
index 98a21dd..d0d81c7 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/ApplicationEntityGroup.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/ApplicationEntityGroup.java
@@ -28,10 +28,10 @@ import java.util.List;
* Get the entity data. Immutable bean for fast access
*/
public final class ApplicationEntityGroup<T extends ApplicationScope> {
- public final ApplicationScope applicationScope;
+ public final T applicationScope;
public final List<EntityIdScope<T>> entityIds;
- public ApplicationEntityGroup(final ApplicationScope applicationScope, final List<EntityIdScope<T>> entityIds) {
+ public ApplicationEntityGroup(final T applicationScope, final List<EntityIdScope<T>> entityIds) {
this.applicationScope = applicationScope;
this.entityIds = entityIds;
}