You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/07/15 19:41:17 UTC
[1/3] incubator-usergrid git commit: First pass POC for running
import in parallel
Repository: incubator-usergrid
Updated Branches:
refs/heads/two-dot-o-dev f62f053de -> ae11d89e0
First pass POC for running import in parallel
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/d4201560
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/d4201560
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/d4201560
Branch: refs/heads/two-dot-o-dev
Commit: d42015605e63016941fd69eeecd67bea16187d95
Parents: c8694f6
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Jul 14 17:27:22 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Jul 14 17:27:22 2015 -0600
----------------------------------------------------------------------
.../apache/usergrid/persistence/Results.java | 4 ++
.../usergrid/services/AbstractService.java | 69 ++++++++++++++++++--
2 files changed, 67 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d4201560/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java b/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java
index a16a0f8..e9a3251 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java
@@ -518,6 +518,10 @@ public class Results implements Iterable<Entity> {
level = Level.CORE_PROPERTIES;
}
+ public void setEntity( final int index, final Entity entity){
+ this.entities.set( index, entity );
+ }
+
public Results withEntity( Entity resultEntity ) {
setEntity( resultEntity );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d4201560/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java b/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
index 569a58c..396b846 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
@@ -48,6 +48,10 @@ import org.apache.usergrid.services.exceptions.ServiceInvocationException;
import org.apache.usergrid.services.exceptions.ServiceResourceNotFoundException;
import org.apache.usergrid.services.exceptions.UnsupportedServiceOperationException;
+import rx.Observable;
+import rx.Subscriber;
+import rx.schedulers.Schedulers;
+
import static org.apache.usergrid.security.shiro.utils.SubjectUtils.getPermissionFromPath;
import static org.apache.usergrid.services.ServiceParameter.filter;
import static org.apache.usergrid.services.ServiceParameter.mergeQueries;
@@ -400,14 +404,67 @@ public abstract class AbstractService implements Service {
List<Entity> entities = results.getEntities();
if ( entities != null ) {
- for ( Entity entity : entities ) {
- Entity imported = importEntity( request, entity );
- if ( imported != entity ) {
- logger.debug( "Import returned new entity instace for {} replacing in results set",
- entity.getUuid() );
- results.replace( imported );
+ importEntitiesParallel(request, results);
+ }
+ }
+
+
+ /**
+ * Import entities in parallel
+ * @param request
+ * @param results
+ */
+ private void importEntitiesParallel(final ServiceRequest request, final Results results ) {
+
+ //create our tuples
+ final Observable<EntityTuple> tuples = Observable.create( new Observable.OnSubscribe<EntityTuple>() {
+ @Override
+ public void call( final Subscriber<? super EntityTuple> subscriber ) {
+ subscriber.onStart();
+
+ final List<Entity> entities = results.getEntities();
+ final int size = entities.size();
+ for ( int i = 0; i < size && !subscriber.isUnsubscribed(); i++ ) {
+ subscriber.onNext( new EntityTuple( i, entities.get( i ) ) );
}
+
+ subscriber.onCompleted();
}
+ } );
+
+ //now process them in parallel up to 10 threads
+
+ tuples.flatMap( tuple -> {
+ //map the entity into the tuple
+ return Observable.just( tuple ).doOnNext( parallelTuple -> {
+ //import the entity and set it at index
+ try {
+
+ final Entity imported = importEntity( request, parallelTuple.entity );
+
+ if(imported != null) {
+ results.setEntity( parallelTuple.index, imported );
+ }
+ }
+ catch ( Exception e ) {
+ throw new RuntimeException(e);
+ }
+ } ).subscribeOn( Schedulers.io() );
+ }, 10 ).toBlocking().lastOrDefault( null );
+ }
+
+
+ /**
+ * Simple tuple representing and entity and it's location within the results
+ */
+ private static final class EntityTuple {
+ private final int index;
+ private final Entity entity;
+
+
+ private EntityTuple( final int index, final Entity entity ) {
+ this.index = index;
+ this.entity = entity;
}
}
[2/3] incubator-usergrid git commit: Merge branch 'two-dot-o-dev' of
https://git-wip-us.apache.org/repos/asf/incubator-usergrid into
import-entities-parallel
Posted by sf...@apache.org.
Merge branch 'two-dot-o-dev' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into import-entities-parallel
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/54011117
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/54011117
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/54011117
Branch: refs/heads/two-dot-o-dev
Commit: 54011117451330f47ee0a218aa54f1f9d5756bbb
Parents: d420156 f62f053
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Jul 14 17:40:16 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Jul 14 17:40:16 2015 -0600
----------------------------------------------------------------------
stack/awscluster/gatling-cluster-cf.json | 2 +-
stack/build-tools/pom.xml | 2 +-
stack/config/pom.xml | 2 +-
stack/core/pom.xml | 14 +++++++-------
stack/corepersistence/collection/pom.xml | 2 +-
stack/corepersistence/common/pom.xml | 2 +-
stack/corepersistence/graph/pom.xml | 2 +-
stack/corepersistence/map/pom.xml | 2 +-
stack/corepersistence/model/pom.xml | 2 +-
stack/corepersistence/pom.xml | 2 +-
stack/corepersistence/queryindex/pom.xml | 2 +-
stack/corepersistence/queue/pom.xml | 2 +-
stack/pom.xml | 2 +-
stack/rest/pom.xml | 4 ++--
stack/services/pom.xml | 2 +-
stack/test-utils/pom.xml | 2 +-
16 files changed, 23 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
[3/3] incubator-usergrid git commit: Updates POC to use a
configurable number of import threads,
as well as use the I/O scheduler to ensure we're not overwhelming the system
with threads.
Posted by sf...@apache.org.
Updates POC to use a configurable number of import threads, as well as use the I/O scheduler to ensure we're not overwhelming the system with threads.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/ae11d89e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/ae11d89e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/ae11d89e
Branch: refs/heads/two-dot-o-dev
Commit: ae11d89e0591ffa9d7268ad13040a39adfc04427
Parents: 5401111
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Jul 15 10:45:12 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Jul 15 10:45:12 2015 -0600
----------------------------------------------------------------------
.../persistence/core/rx/RxSchedulerFig.java | 9 +++++++++
.../apache/usergrid/services/AbstractService.java | 18 +++++++++++++++---
2 files changed, 24 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ae11d89e/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxSchedulerFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxSchedulerFig.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxSchedulerFig.java
index ac45967..bf0a904 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxSchedulerFig.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxSchedulerFig.java
@@ -44,6 +44,11 @@ public interface RxSchedulerFig extends GuicyFig {
*/
String IO_SCHEDULER_NAME = "scheduler.io.poolName";
+ /**
+ * The number of threads to use when importing entities into result sets
+ */
+ String IO_IMPORT_THREADS = "scheduler.import.threads";
+
@@ -55,6 +60,10 @@ public interface RxSchedulerFig extends GuicyFig {
@Key(IO_SCHEDULER_NAME)
String getIoSchedulerName();
+ @Default("20")
+ @Key( IO_IMPORT_THREADS)
+ int getImportThreads();
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ae11d89e/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java b/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
index 396b846..edf8ab2 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
@@ -39,6 +39,8 @@ import org.apache.usergrid.persistence.EntityRef;
import org.apache.usergrid.persistence.Query;
import org.apache.usergrid.persistence.Results;
import org.apache.usergrid.persistence.Schema;
+import org.apache.usergrid.persistence.core.rx.RxSchedulerFig;
+import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
import org.apache.usergrid.security.shiro.utils.SubjectUtils;
import org.apache.usergrid.services.ServiceParameter.IdParameter;
import org.apache.usergrid.services.ServiceParameter.NameParameter;
@@ -48,9 +50,11 @@ import org.apache.usergrid.services.exceptions.ServiceInvocationException;
import org.apache.usergrid.services.exceptions.ServiceResourceNotFoundException;
import org.apache.usergrid.services.exceptions.UnsupportedServiceOperationException;
+import com.google.inject.Injector;
+
import rx.Observable;
+import rx.Scheduler;
import rx.Subscriber;
-import rx.schedulers.Schedulers;
import static org.apache.usergrid.security.shiro.utils.SubjectUtils.getPermissionFromPath;
import static org.apache.usergrid.services.ServiceParameter.filter;
@@ -92,6 +96,11 @@ public abstract class AbstractService implements Service {
protected Map<String, Object> defaultEntityMetadata;
+ private Scheduler rxScheduler;
+ private RxSchedulerFig rxSchedulerFig;
+
+
+
public AbstractService() {
@@ -101,6 +110,9 @@ public abstract class AbstractService implements Service {
public void setServiceManager( ServiceManager sm ) {
this.sm = sm;
em = sm.getEntityManager();
+ final Injector injector = sm.getApplicationContext().getBean( Injector.class );
+ rxScheduler = injector.getInstance( RxTaskScheduler.class ).getAsyncIOScheduler();
+ rxSchedulerFig = injector.getInstance( RxSchedulerFig.class );
}
@@ -449,8 +461,8 @@ public abstract class AbstractService implements Service {
catch ( Exception e ) {
throw new RuntimeException(e);
}
- } ).subscribeOn( Schedulers.io() );
- }, 10 ).toBlocking().lastOrDefault( null );
+ } ).subscribeOn( rxScheduler );
+ }, rxSchedulerFig.getImportThreads() ).toBlocking().lastOrDefault( null );
}