You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/07/15 19:41:17 UTC

[1/3] incubator-usergrid git commit: First pass POC for running import in parallel

Repository: incubator-usergrid
Updated Branches:
  refs/heads/two-dot-o-dev f62f053de -> ae11d89e0


First pass POC for running import in parallel


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/d4201560
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/d4201560
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/d4201560

Branch: refs/heads/two-dot-o-dev
Commit: d42015605e63016941fd69eeecd67bea16187d95
Parents: c8694f6
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Jul 14 17:27:22 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Jul 14 17:27:22 2015 -0600

----------------------------------------------------------------------
 .../apache/usergrid/persistence/Results.java    |  4 ++
 .../usergrid/services/AbstractService.java      | 69 ++++++++++++++++++--
 2 files changed, 67 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d4201560/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java b/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java
index a16a0f8..e9a3251 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java
@@ -518,6 +518,10 @@ public class Results implements Iterable<Entity> {
         level = Level.CORE_PROPERTIES;
     }
 
+    public void setEntity( final int index, final Entity entity){
+        this.entities.set( index, entity );
+    }
+
 
     public Results withEntity( Entity resultEntity ) {
         setEntity( resultEntity );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d4201560/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java b/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
index 569a58c..396b846 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
@@ -48,6 +48,10 @@ import org.apache.usergrid.services.exceptions.ServiceInvocationException;
 import org.apache.usergrid.services.exceptions.ServiceResourceNotFoundException;
 import org.apache.usergrid.services.exceptions.UnsupportedServiceOperationException;
 
+import rx.Observable;
+import rx.Subscriber;
+import rx.schedulers.Schedulers;
+
 import static org.apache.usergrid.security.shiro.utils.SubjectUtils.getPermissionFromPath;
 import static org.apache.usergrid.services.ServiceParameter.filter;
 import static org.apache.usergrid.services.ServiceParameter.mergeQueries;
@@ -400,14 +404,67 @@ public abstract class AbstractService implements Service {
 
         List<Entity> entities = results.getEntities();
         if ( entities != null ) {
-            for ( Entity entity : entities ) {
-                Entity imported = importEntity( request, entity );
-                if ( imported != entity ) {
-                    logger.debug( "Import returned new entity instace for {} replacing in results set",
-                            entity.getUuid() );
-                    results.replace( imported );
+            importEntitiesParallel(request, results);
+        }
+    }
+
+
+    /**
+     * Import entities in parallel
+     * @param request
+     * @param results
+     */
+    private void importEntitiesParallel(final ServiceRequest request, final Results results ) {
+
+        //create our tuples
+        final Observable<EntityTuple> tuples = Observable.create( new Observable.OnSubscribe<EntityTuple>() {
+            @Override
+            public void call( final Subscriber<? super EntityTuple> subscriber ) {
+                subscriber.onStart();
+
+                final List<Entity> entities = results.getEntities();
+                final int size = entities.size();
+                for ( int i = 0; i < size && !subscriber.isUnsubscribed(); i++ ) {
+                    subscriber.onNext( new EntityTuple( i, entities.get( i ) ) );
                 }
+
+                subscriber.onCompleted();
             }
+        } );
+
+        //now process them in parallel up to 10 threads
+
+        tuples.flatMap( tuple -> {
+            //map the entity into the tuple
+            return Observable.just( tuple ).doOnNext( parallelTuple -> {
+                //import the entity and set it at index
+                try {
+
+                    final Entity imported = importEntity( request, parallelTuple.entity );
+
+                    if(imported != null) {
+                        results.setEntity( parallelTuple.index, imported );
+                    }
+                }
+                catch ( Exception e ) {
+                    throw new RuntimeException(e);
+                }
+            } ).subscribeOn( Schedulers.io() );
+        }, 10 ).toBlocking().lastOrDefault( null );
+    }
+
+
+    /**
+     * Simple tuple representing and entity and it's location within the results
+     */
+    private static final class EntityTuple {
+        private final int index;
+        private final Entity entity;
+
+
+        private EntityTuple( final int index, final Entity entity ) {
+            this.index = index;
+            this.entity = entity;
         }
     }
 


[2/3] incubator-usergrid git commit: Merge branch 'two-dot-o-dev' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into import-entities-parallel

Posted by sf...@apache.org.
Merge branch 'two-dot-o-dev' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into import-entities-parallel


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/54011117
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/54011117
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/54011117

Branch: refs/heads/two-dot-o-dev
Commit: 54011117451330f47ee0a218aa54f1f9d5756bbb
Parents: d420156 f62f053
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Jul 14 17:40:16 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Jul 14 17:40:16 2015 -0600

----------------------------------------------------------------------
 stack/awscluster/gatling-cluster-cf.json |  2 +-
 stack/build-tools/pom.xml                |  2 +-
 stack/config/pom.xml                     |  2 +-
 stack/core/pom.xml                       | 14 +++++++-------
 stack/corepersistence/collection/pom.xml |  2 +-
 stack/corepersistence/common/pom.xml     |  2 +-
 stack/corepersistence/graph/pom.xml      |  2 +-
 stack/corepersistence/map/pom.xml        |  2 +-
 stack/corepersistence/model/pom.xml      |  2 +-
 stack/corepersistence/pom.xml            |  2 +-
 stack/corepersistence/queryindex/pom.xml |  2 +-
 stack/corepersistence/queue/pom.xml      |  2 +-
 stack/pom.xml                            |  2 +-
 stack/rest/pom.xml                       |  4 ++--
 stack/services/pom.xml                   |  2 +-
 stack/test-utils/pom.xml                 |  2 +-
 16 files changed, 23 insertions(+), 23 deletions(-)
----------------------------------------------------------------------



[3/3] incubator-usergrid git commit: Updates POC to use a configurable number of import threads, as well as use the I/O scheduler to ensure we're not overwhelming the system with threads.

Posted by sf...@apache.org.
Updates POC to use a configurable number of import threads, as well as use the I/O scheduler to ensure we're not overwhelming the system with threads.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/ae11d89e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/ae11d89e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/ae11d89e

Branch: refs/heads/two-dot-o-dev
Commit: ae11d89e0591ffa9d7268ad13040a39adfc04427
Parents: 5401111
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Jul 15 10:45:12 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Jul 15 10:45:12 2015 -0600

----------------------------------------------------------------------
 .../persistence/core/rx/RxSchedulerFig.java       |  9 +++++++++
 .../apache/usergrid/services/AbstractService.java | 18 +++++++++++++++---
 2 files changed, 24 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ae11d89e/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxSchedulerFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxSchedulerFig.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxSchedulerFig.java
index ac45967..bf0a904 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxSchedulerFig.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxSchedulerFig.java
@@ -44,6 +44,11 @@ public interface RxSchedulerFig extends GuicyFig {
      */
     String IO_SCHEDULER_NAME = "scheduler.io.poolName";
 
+    /**
+     * The number of threads to use when importing entities into result sets
+     */
+    String IO_IMPORT_THREADS = "scheduler.import.threads";
+
 
 
 
@@ -55,6 +60,10 @@ public interface RxSchedulerFig extends GuicyFig {
     @Key(IO_SCHEDULER_NAME)
     String getIoSchedulerName();
 
+    @Default("20")
+    @Key( IO_IMPORT_THREADS)
+    int getImportThreads();
+
 
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ae11d89e/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java b/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
index 396b846..edf8ab2 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
@@ -39,6 +39,8 @@ import org.apache.usergrid.persistence.EntityRef;
 import org.apache.usergrid.persistence.Query;
 import org.apache.usergrid.persistence.Results;
 import org.apache.usergrid.persistence.Schema;
+import org.apache.usergrid.persistence.core.rx.RxSchedulerFig;
+import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
 import org.apache.usergrid.security.shiro.utils.SubjectUtils;
 import org.apache.usergrid.services.ServiceParameter.IdParameter;
 import org.apache.usergrid.services.ServiceParameter.NameParameter;
@@ -48,9 +50,11 @@ import org.apache.usergrid.services.exceptions.ServiceInvocationException;
 import org.apache.usergrid.services.exceptions.ServiceResourceNotFoundException;
 import org.apache.usergrid.services.exceptions.UnsupportedServiceOperationException;
 
+import com.google.inject.Injector;
+
 import rx.Observable;
+import rx.Scheduler;
 import rx.Subscriber;
-import rx.schedulers.Schedulers;
 
 import static org.apache.usergrid.security.shiro.utils.SubjectUtils.getPermissionFromPath;
 import static org.apache.usergrid.services.ServiceParameter.filter;
@@ -92,6 +96,11 @@ public abstract class AbstractService implements Service {
 
     protected Map<String, Object> defaultEntityMetadata;
 
+    private Scheduler rxScheduler;
+    private RxSchedulerFig rxSchedulerFig;
+
+
+
 
     public AbstractService() {
 
@@ -101,6 +110,9 @@ public abstract class AbstractService implements Service {
     public void setServiceManager( ServiceManager sm ) {
         this.sm = sm;
         em = sm.getEntityManager();
+        final Injector injector = sm.getApplicationContext().getBean( Injector.class );
+        rxScheduler = injector.getInstance( RxTaskScheduler.class ).getAsyncIOScheduler();
+        rxSchedulerFig = injector.getInstance( RxSchedulerFig.class );
     }
 
 
@@ -449,8 +461,8 @@ public abstract class AbstractService implements Service {
                 catch ( Exception e ) {
                     throw new RuntimeException(e);
                 }
-            } ).subscribeOn( Schedulers.io() );
-        }, 10 ).toBlocking().lastOrDefault( null );
+            } ).subscribeOn( rxScheduler );
+        }, rxSchedulerFig.getImportThreads() ).toBlocking().lastOrDefault( null );
     }