You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/07/15 18:45:15 UTC
incubator-usergrid git commit: Updates POC to use a configurable
number of import threads,
as well as use the I/O scheduler to ensure we're not overwhelming the system
with threads.
Repository: incubator-usergrid
Updated Branches:
refs/heads/import-entities-parallel 540111174 -> ae11d89e0
Updates POC to use a configurable number of import threads, as well as use the I/O scheduler to ensure we're not overwhelming the system with threads.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/ae11d89e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/ae11d89e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/ae11d89e
Branch: refs/heads/import-entities-parallel
Commit: ae11d89e0591ffa9d7268ad13040a39adfc04427
Parents: 5401111
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Jul 15 10:45:12 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Jul 15 10:45:12 2015 -0600
----------------------------------------------------------------------
.../persistence/core/rx/RxSchedulerFig.java | 9 +++++++++
.../apache/usergrid/services/AbstractService.java | 18 +++++++++++++++---
2 files changed, 24 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ae11d89e/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxSchedulerFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxSchedulerFig.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxSchedulerFig.java
index ac45967..bf0a904 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxSchedulerFig.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxSchedulerFig.java
@@ -44,6 +44,11 @@ public interface RxSchedulerFig extends GuicyFig {
*/
String IO_SCHEDULER_NAME = "scheduler.io.poolName";
+ /**
+ * The number of threads to use when importing entities into result sets
+ */
+ String IO_IMPORT_THREADS = "scheduler.import.threads";
+
@@ -55,6 +60,10 @@ public interface RxSchedulerFig extends GuicyFig {
@Key(IO_SCHEDULER_NAME)
String getIoSchedulerName();
+ @Default("20")
+ @Key( IO_IMPORT_THREADS)
+ int getImportThreads();
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ae11d89e/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java b/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
index 396b846..edf8ab2 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
@@ -39,6 +39,8 @@ import org.apache.usergrid.persistence.EntityRef;
import org.apache.usergrid.persistence.Query;
import org.apache.usergrid.persistence.Results;
import org.apache.usergrid.persistence.Schema;
+import org.apache.usergrid.persistence.core.rx.RxSchedulerFig;
+import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
import org.apache.usergrid.security.shiro.utils.SubjectUtils;
import org.apache.usergrid.services.ServiceParameter.IdParameter;
import org.apache.usergrid.services.ServiceParameter.NameParameter;
@@ -48,9 +50,11 @@ import org.apache.usergrid.services.exceptions.ServiceInvocationException;
import org.apache.usergrid.services.exceptions.ServiceResourceNotFoundException;
import org.apache.usergrid.services.exceptions.UnsupportedServiceOperationException;
+import com.google.inject.Injector;
+
import rx.Observable;
+import rx.Scheduler;
import rx.Subscriber;
-import rx.schedulers.Schedulers;
import static org.apache.usergrid.security.shiro.utils.SubjectUtils.getPermissionFromPath;
import static org.apache.usergrid.services.ServiceParameter.filter;
@@ -92,6 +96,11 @@ public abstract class AbstractService implements Service {
protected Map<String, Object> defaultEntityMetadata;
+ private Scheduler rxScheduler;
+ private RxSchedulerFig rxSchedulerFig;
+
+
+
public AbstractService() {
@@ -101,6 +110,9 @@ public abstract class AbstractService implements Service {
public void setServiceManager( ServiceManager sm ) {
this.sm = sm;
em = sm.getEntityManager();
+ final Injector injector = sm.getApplicationContext().getBean( Injector.class );
+ rxScheduler = injector.getInstance( RxTaskScheduler.class ).getAsyncIOScheduler();
+ rxSchedulerFig = injector.getInstance( RxSchedulerFig.class );
}
@@ -449,8 +461,8 @@ public abstract class AbstractService implements Service {
catch ( Exception e ) {
throw new RuntimeException(e);
}
- } ).subscribeOn( Schedulers.io() );
- }, 10 ).toBlocking().lastOrDefault( null );
+ } ).subscribeOn( rxScheduler );
+ }, rxSchedulerFig.getImportThreads() ).toBlocking().lastOrDefault( null );
}