You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/07/15 18:45:15 UTC

incubator-usergrid git commit: Updates POC to use a configurable number of import threads, as well as use the I/O scheduler to ensure we're not overwhelming the system with threads.

Repository: incubator-usergrid
Updated Branches:
  refs/heads/import-entities-parallel 540111174 -> ae11d89e0


Updates POC to use a configurable number of import threads, as well as use the I/O scheduler to ensure we're not overwhelming the system with threads.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/ae11d89e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/ae11d89e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/ae11d89e

Branch: refs/heads/import-entities-parallel
Commit: ae11d89e0591ffa9d7268ad13040a39adfc04427
Parents: 5401111
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Jul 15 10:45:12 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Jul 15 10:45:12 2015 -0600

----------------------------------------------------------------------
 .../persistence/core/rx/RxSchedulerFig.java       |  9 +++++++++
 .../apache/usergrid/services/AbstractService.java | 18 +++++++++++++++---
 2 files changed, 24 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ae11d89e/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxSchedulerFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxSchedulerFig.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxSchedulerFig.java
index ac45967..bf0a904 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxSchedulerFig.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxSchedulerFig.java
@@ -44,6 +44,11 @@ public interface RxSchedulerFig extends GuicyFig {
      */
     String IO_SCHEDULER_NAME = "scheduler.io.poolName";
 
+    /**
+     * The number of threads to use when importing entities into result sets
+     */
+    String IO_IMPORT_THREADS = "scheduler.import.threads";
+
 
 
 
@@ -55,6 +60,10 @@ public interface RxSchedulerFig extends GuicyFig {
     @Key(IO_SCHEDULER_NAME)
     String getIoSchedulerName();
 
+    @Default("20")
+    @Key( IO_IMPORT_THREADS)
+    int getImportThreads();
+
 
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ae11d89e/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java b/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
index 396b846..edf8ab2 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
@@ -39,6 +39,8 @@ import org.apache.usergrid.persistence.EntityRef;
 import org.apache.usergrid.persistence.Query;
 import org.apache.usergrid.persistence.Results;
 import org.apache.usergrid.persistence.Schema;
+import org.apache.usergrid.persistence.core.rx.RxSchedulerFig;
+import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
 import org.apache.usergrid.security.shiro.utils.SubjectUtils;
 import org.apache.usergrid.services.ServiceParameter.IdParameter;
 import org.apache.usergrid.services.ServiceParameter.NameParameter;
@@ -48,9 +50,11 @@ import org.apache.usergrid.services.exceptions.ServiceInvocationException;
 import org.apache.usergrid.services.exceptions.ServiceResourceNotFoundException;
 import org.apache.usergrid.services.exceptions.UnsupportedServiceOperationException;
 
+import com.google.inject.Injector;
+
 import rx.Observable;
+import rx.Scheduler;
 import rx.Subscriber;
-import rx.schedulers.Schedulers;
 
 import static org.apache.usergrid.security.shiro.utils.SubjectUtils.getPermissionFromPath;
 import static org.apache.usergrid.services.ServiceParameter.filter;
@@ -92,6 +96,11 @@ public abstract class AbstractService implements Service {
 
     protected Map<String, Object> defaultEntityMetadata;
 
+    private Scheduler rxScheduler;
+    private RxSchedulerFig rxSchedulerFig;
+
+
+
 
     public AbstractService() {
 
@@ -101,6 +110,9 @@ public abstract class AbstractService implements Service {
     public void setServiceManager( ServiceManager sm ) {
         this.sm = sm;
         em = sm.getEntityManager();
+        final Injector injector = sm.getApplicationContext().getBean( Injector.class );
+        rxScheduler = injector.getInstance( RxTaskScheduler.class ).getAsyncIOScheduler();
+        rxSchedulerFig = injector.getInstance( RxSchedulerFig.class );
     }
 
 
@@ -449,8 +461,8 @@ public abstract class AbstractService implements Service {
                 catch ( Exception e ) {
                     throw new RuntimeException(e);
                 }
-            } ).subscribeOn( Schedulers.io() );
-        }, 10 ).toBlocking().lastOrDefault( null );
+            } ).subscribeOn( rxScheduler );
+        }, rxSchedulerFig.getImportThreads() ).toBlocking().lastOrDefault( null );
     }