You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/02/21 14:40:31 UTC

[16/18] incubator-usergrid git commit: Makes index rebuild parallel.

Makes index rebuild parallel.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/d2a0ff2a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/d2a0ff2a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/d2a0ff2a

Branch: refs/heads/USERGRID-273-indexbuffer
Commit: d2a0ff2a5887f71d50a07767fbf4e25f6e474f8a
Parents: 4df281e
Author: Todd Nine <tn...@apigee.com>
Authored: Thu Feb 19 22:40:11 2015 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu Feb 19 22:40:11 2015 -0700

----------------------------------------------------------------------
 .../corepersistence/CpEntityManager.java        |  2 +-
 .../usergrid/corepersistence/CpWalker.java      | 73 ++++++++++----------
 .../org/apache/usergrid/rest/IndexResource.java | 26 ++++---
 3 files changed, 55 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d2a0ff2a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index e117a73..7eb9f94 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -2751,7 +2751,7 @@ public class CpEntityManager implements EntityManager {
      */
     public void reindex( final EntityManagerFactory.ProgressObserver po ) throws Exception {
 
-        CpWalker walker = new CpWalker( po.getWriteDelayTime() );
+        CpWalker walker = new CpWalker( );
 
         walker.walkCollections( this, application, new CpVisitor() {
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d2a0ff2a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
index ff631ed..8a9eed5 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
@@ -34,7 +34,7 @@ import org.apache.usergrid.persistence.model.entity.SimpleId;
 import rx.Observable;
 import rx.functions.Action1;
 import rx.functions.Func1;
-
+import rx.schedulers.Schedulers;
 
 
 /**
@@ -44,26 +44,25 @@ public class CpWalker {
 
     private static final Logger logger = LoggerFactory.getLogger( CpWalker.class );
 
-    private final long writeDelayMs;
 
 
     /**
      * Wait the set amount of time between successive writes.
-     * @param writeDelayMs
+     * @param
      */
-    public CpWalker(final long writeDelayMs){
-         this.writeDelayMs = writeDelayMs;
+    public CpWalker(){
+
     }
 
 
-    public void walkCollections( final CpEntityManager em, final EntityRef start, 
+    public void walkCollections( final CpEntityManager em, final EntityRef start,
             final CpVisitor visitor ) throws Exception {
 
         doWalkCollections( em, new SimpleId( start.getUuid(), start.getType() ), visitor );
     }
 
 
-    private void doWalkCollections( 
+    private void doWalkCollections(
             final CpEntityManager em, final Id applicationId, final CpVisitor visitor ) {
 
         final ApplicationScope applicationScope = em.getApplicationScope();
@@ -89,45 +88,45 @@ public class CpWalker {
 
                 logger.debug( "Loading edges of type {} from node {}", edgeType, applicationId );
 
-                return gm.loadEdgesFromSource( new SimpleSearchByEdgeType( 
-                    applicationId,
-                    edgeType,
-                    Long.MAX_VALUE,
-                    SearchByEdgeType.Order.DESCENDING,
-                    null ) );
+                return gm.loadEdgesFromSource( new SimpleSearchByEdgeType( applicationId, edgeType, Long.MAX_VALUE,
+                    SearchByEdgeType.Order.DESCENDING, null ) );
             }
-        } ).doOnNext( new Action1<Edge>() {
-
+        } )
+                 //process our edges in parallel for as much efficiency as possible
+                 .parallel( new Func1<Observable<Edge>, Observable<Edge>>() {
             @Override
-            public void call( Edge edge ) {
+            public Observable<Edge> call( final Observable<Edge> edgeObservable ) {
+                //visit and update the entity
+                return edgeObservable.doOnNext( new Action1<Edge>() {
 
-                logger.info( "Re-indexing edge {}", edge );
+                                    @Override
+                                    public void call( Edge edge ) {
 
-                EntityRef targetNodeEntityRef = new SimpleEntityRef( 
-                        edge.getTargetNode().getType(), edge.getTargetNode().getUuid() );
+                                        logger.info( "Re-indexing edge {}", edge );
 
-                Entity entity;
-                try {
-                    entity = em.get( targetNodeEntityRef );
-                }
-                catch ( Exception ex ) {
-                    logger.error( "Error getting sourceEntity {}:{}, continuing", 
-                            targetNodeEntityRef.getType(), targetNodeEntityRef.getUuid() );
-                    return;
-                }
+                                        EntityRef targetNodeEntityRef =
+                                            new SimpleEntityRef( edge.getTargetNode().getType(), edge.getTargetNode().getUuid() );
 
+                                        Entity entity;
+                                        try {
+                                            entity = em.get( targetNodeEntityRef );
+                                        }
+                                        catch ( Exception ex ) {
+                                            logger.error( "Error getting sourceEntity {}:{}, continuing", targetNodeEntityRef.getType(),
+                                                targetNodeEntityRef.getUuid() );
+                                            return;
+                                        }
 
-                String collName = CpNamingUtils.getCollectionName( edge.getType() );
 
-                visitor.visitCollectionEntry( em, collName, entity );
+                                        String collName = CpNamingUtils.getCollectionName( edge.getType() );
 
-                try {
-                    Thread.sleep( writeDelayMs );
-                }
-                catch ( InterruptedException e ) {
-                    throw new RuntimeException( "Unable to wait" );
-                }
+                                        visitor.visitCollectionEntry( em, collName, entity );
+                                    }
+                                } );
             }
-        } ).toBlocking().lastOrDefault( null ); // end foreach on edges
+        }, Schedulers.io() )
+
+        //wait for it to complete
+        .toBlocking().lastOrDefault( null ); // end foreach on edges
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d2a0ff2a/stack/rest/src/main/java/org/apache/usergrid/rest/IndexResource.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/IndexResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/IndexResource.java
index 5cbc499..acce2d8 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/IndexResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/IndexResource.java
@@ -117,22 +117,32 @@ public class IndexResource extends AbstractContextResource {
 
         final UUID appId = UUIDUtils.tryExtractUUID(applicationIdStr);
         ApiResponse response = createApiResponse();
-        response.setAction( "rebuild indexes" );
+        response.setAction( "rebuild indexes started" );
 
+        final EntityManagerFactory.ProgressObserver po = new EntityManagerFactory.ProgressObserver() {
 
-        final EntityManager em = emf.getEntityManager( appId );
+            @Override
+            public void onProgress( final EntityRef entity ) {
+                logger.info( "Indexing entity {}:{}", entity.getType(), entity.getUuid() );
+            }
+
+
+            @Override
+            public long getWriteDelayTime() {
+                return delay;
+            }
+        };
 
-        final Set<String> collectionNames = em.getApplicationCollections();
 
         final Thread rebuild = new Thread() {
 
             @Override
             public void run() {
-                for ( String collectionName : collectionNames )
-
-
-                {
-                    rebuildCollection( appId, collectionName, delay );
+                try {
+                    emf.rebuildApplicationIndexes( appId, po );
+                }
+                catch ( Exception e ) {
+                    logger.error( "Unable to re-index application" );
                 }
             }
         };