You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/02/27 00:02:01 UTC

incubator-usergrid git commit: Updated entity migration to run in parallel

Repository: incubator-usergrid
Updated Branches:
  refs/heads/USERGRID-405 787bc0913 -> 95897754a


Updated entity migration to run in parallel


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/95897754
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/95897754
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/95897754

Branch: refs/heads/USERGRID-405
Commit: 95897754a083debeae5fd1733836f7905baec76f
Parents: 787bc09
Author: Todd Nine <tn...@apigee.com>
Authored: Thu Feb 26 16:01:59 2015 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu Feb 26 16:01:59 2015 -0700

----------------------------------------------------------------------
 .../impl/MvccEntityDataMigrationImpl.java       | 150 +++++++++++--------
 .../impl/EntityVersionCreatedTaskTest.java      |   2 -
 .../core/migration/data/DataMigration.java      |   3 +-
 .../core/scope/ApplicationEntityGroup.java      |   4 +-
 4 files changed, 95 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/95897754/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntityDataMigrationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntityDataMigrationImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntityDataMigrationImpl.java
index 4c46769..4dfc64b 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntityDataMigrationImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntityDataMigrationImpl.java
@@ -19,10 +19,12 @@
  */
 package org.apache.usergrid.persistence.collection.serialization.impl;
 
-import com.google.inject.Inject;
-import com.netflix.astyanax.Keyspace;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.usergrid.persistence.collection.CollectionScope;
 import org.apache.usergrid.persistence.collection.EntitySet;
 import org.apache.usergrid.persistence.collection.MvccEntity;
@@ -33,20 +35,20 @@ import org.apache.usergrid.persistence.core.migration.data.DataMigration;
 import org.apache.usergrid.persistence.core.migration.data.DataMigrationException;
 import org.apache.usergrid.persistence.core.migration.schema.MigrationStrategy;
 import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.scope.EntityIdScope;
-import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import com.google.inject.Inject;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
 import rx.Observable;
 import rx.functions.Action1;
 import rx.functions.Func1;
+import rx.functions.Func2;
 import rx.schedulers.Schedulers;
 
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * Data migration strategy for entities
@@ -56,74 +58,102 @@ public class MvccEntityDataMigrationImpl implements CollectionDataMigration {
     private final Keyspace keyspace;
     private final MvccEntityMigrationStrategy entityMigrationStrategy;
 
+
     @Inject
-    public MvccEntityDataMigrationImpl(Keyspace keyspace, MvccEntityMigrationStrategy serializationStrategy){
+    public MvccEntityDataMigrationImpl( Keyspace keyspace, MvccEntityMigrationStrategy serializationStrategy ) {
 
         this.keyspace = keyspace;
         this.entityMigrationStrategy = serializationStrategy;
     }
 
+
     @Override
-    public Observable migrate(final  Observable<ApplicationEntityGroup> applicationEntityGroupObservable, final DataMigration.ProgressObserver observer) {
+    public Observable migrate( final Observable<ApplicationEntityGroup> applicationEntityGroupObservable,
+                               final DataMigration.ProgressObserver observer ) {
         final AtomicLong atomicLong = new AtomicLong();
-        final MutationBatch totalBatch = keyspace.prepareMutationBatch();
 
 
+        //capture the time the test starts
         final UUID now = UUIDGenerator.newTimeUUID();
 
-        return applicationEntityGroupObservable.flatMap(new Func1<ApplicationEntityGroup, Observable<Id>>() {
+        return applicationEntityGroupObservable.flatMap( new Func1<ApplicationEntityGroup, Observable<Long>>() {
             @Override
-            public Observable call(final ApplicationEntityGroup applicationEntityGroup) {
+            public Observable<Long> call( final ApplicationEntityGroup applicationEntityGroup ) {
                 final List<EntityIdScope<CollectionScope>> entityIds = applicationEntityGroup.entityIds;
 
                 //go through each entity in the system, and load it's entire
                 // history
-                return Observable.from(entityIds)
-                    .subscribeOn(Schedulers.io())
-                    .map(new Func1<EntityIdScope<CollectionScope>, Id>() {
-                        @Override
-                        public Id call(EntityIdScope<CollectionScope> idScope) {
-
-                            MigrationStrategy.MigrationRelationship<MvccEntitySerializationStrategy> migration = entityMigrationStrategy.getMigration();
-
-
-                            CollectionScope currentScope = idScope.getCollectionScope();
-                            //for each element in the history in the previous version,
-                            // copy it to the CF in v2
-                            EntitySet allVersions = migration.from()
-                                .load( currentScope, Collections.singleton( idScope.getId() ), now );
-
-                            final MvccEntity version = allVersions.getEntity( idScope.getId() );
-
-                           final MutationBatch versionBatch =
-                               migration.to().write(currentScope, version);
-
-                           totalBatch.mergeShallow(versionBatch);
-
-                            throw new UnsupportedOperationException( "TODO, make this more functional in flushing" );
-
-//                               if (atomicLong.incrementAndGet() % 50 == 0) {
-//                                   executeBatch(totalBatch, observer, atomicLong);
-//                               }
-
-//                                executeBatch(totalBatch, observer, atomicLong);
-
-//      return idScope.getId();
-                        }
-
-
-                    } ).buffer(100).doOnNext( new Action1<List<Id>>() {
-                        @Override
-                        public void call( List<Id> ids ) {
-                            executeBatch( totalBatch, observer, atomicLong );
-                        }
-                    });
+                return Observable.from( entityIds ).subscribeOn( Schedulers.io() )
+                                 .parallel( new Func1<Observable<EntityIdScope<CollectionScope>>, Observable<Long>>() {
+
+
+                                     //process the ids in parallel
+                                     @Override
+                                     public Observable<Long> call(
+                                             final Observable<EntityIdScope<CollectionScope>> entityIdScopeObservable
+                                                                 ) {
+
+                                         final MutationBatch totalBatch = keyspace.prepareMutationBatch();
+
+                                         return entityIdScopeObservable.doOnNext(
+                                                 new Action1<EntityIdScope<CollectionScope>>() {
+
+                                                     //load the entity and add it to the toal mutation
+                                                     @Override
+                                                     public void call( final EntityIdScope<CollectionScope> idScope ) {
+
+                                                         //load the entity
+                                                         MigrationStrategy
+                                                                 .MigrationRelationship<MvccEntitySerializationStrategy>
+                                                                 migration = entityMigrationStrategy.getMigration();
+
+
+                                                         CollectionScope currentScope = idScope.getCollectionScope();
+                                                         //for each element in the history in the previous
+                                                         // version,
+                                                         // copy it to the CF in v2
+                                                         EntitySet allVersions = migration.from().load( currentScope,
+                                                                 Collections.singleton( idScope.getId() ), now );
+
+                                                         final MvccEntity version =
+                                                                 allVersions.getEntity( idScope.getId() );
+
+                                                         final MutationBatch versionBatch =
+                                                                 migration.to().write( currentScope, version );
+
+                                                         totalBatch.mergeShallow( versionBatch );
+                                                     }
+                                                 } )
+                                                 //every 100 flush the mutation
+                                                 .buffer( 100 ).doOnNext(
+                                                         new Action1<List<EntityIdScope<CollectionScope>>>() {
+                                                             @Override
+                                                             public void call(
+                                                                     final List<EntityIdScope<CollectionScope>> ids ) {
+                                                                 atomicLong.addAndGet( 100 );
+                                                                 executeBatch( totalBatch, observer, atomicLong );
+                                                             }
+                                                         } )
+                                                         //count the results
+                                                 .reduce( 0l,
+                                                         new Func2<Long, List<EntityIdScope<CollectionScope>>, Long>() {
+                                                             @Override
+                                                             public Long call( final Long aLong,
+                                                                               final
+                                                                               List<EntityIdScope<CollectionScope>>
+                                                                                       ids ) {
+                                                                 return aLong + ids.size();
+                                                             }
+                                                         } );
+                                     }
+                                 } );
             }
-        });
-
+        } );
     }
 
-    protected void executeBatch( final MutationBatch batch, final DataMigration.ProgressObserver po, final AtomicLong count ) {
+
+    protected void executeBatch( final MutationBatch batch, final DataMigration.ProgressObserver po,
+                                 final AtomicLong count ) {
         try {
             batch.execute();
 
@@ -135,11 +165,13 @@ public class MvccEntityDataMigrationImpl implements CollectionDataMigration {
         }
     }
 
+
     @Override
     public int getVersion() {
         return entityMigrationStrategy.getVersion();
     }
 
+
     @Override
     public MigrationType getType() {
         return MigrationType.Entities;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/95897754/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTaskTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTaskTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTaskTest.java
index 90055a4..ade58f6 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTaskTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTaskTest.java
@@ -59,8 +59,6 @@ public class EntityVersionCreatedTaskTest {
 
         // create a latch for the event listener, and add it to the list of events
 
-        final int sizeToReturn = 0;
-
         final Set<EntityVersionCreated> listeners = mock( Set.class );
 
         when ( listeners.size()).thenReturn( 0 );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/95897754/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigration.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigration.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigration.java
index 5b04466..e1e3092 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigration.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigration.java
@@ -46,9 +46,10 @@ public interface DataMigration <T> {
     /**
      * Migrate the data to the specified version
      * @param observer
+     * @Return an observable containing the count of the number of elements migrated
      * @throws Throwable
      */
-    public Observable migrate(final Observable<T> applicationEntityGroup,final ProgressObserver observer) throws Throwable;
+    public Observable<Long> migrate(final Observable<T> applicationEntityGroup,final ProgressObserver observer) throws Throwable;
 
     /**
      * Get the version of this migration.  It must be unique.

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/95897754/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/ApplicationEntityGroup.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/ApplicationEntityGroup.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/ApplicationEntityGroup.java
index 98a21dd..d0d81c7 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/ApplicationEntityGroup.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/ApplicationEntityGroup.java
@@ -28,10 +28,10 @@ import java.util.List;
  * Get the entity data.  Immutable bean for fast access
  */
 public final class ApplicationEntityGroup<T extends ApplicationScope> {
-    public final ApplicationScope applicationScope;
+    public final T applicationScope;
     public final List<EntityIdScope<T>> entityIds;
 
-    public ApplicationEntityGroup(final ApplicationScope applicationScope, final List<EntityIdScope<T>> entityIds) {
+    public ApplicationEntityGroup(final T applicationScope, final List<EntityIdScope<T>> entityIds) {
         this.applicationScope = applicationScope;
         this.entityIds = entityIds;
     }