You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by gr...@apache.org on 2015/07/24 17:22:09 UTC
[41/50] [abbrv] incubator-usergrid git commit: add single timer
add single timer
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/b1393b4e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/b1393b4e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/b1393b4e
Branch: refs/heads/USERGRID-869
Commit: b1393b4ed242a9c6de3b76f9038aa9def7a0d428
Parents: ca4575d
Author: Shawn Feldman <sf...@apache.org>
Authored: Wed Jul 22 09:16:56 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Wed Jul 22 09:16:56 2015 -0600
----------------------------------------------------------------------
.../usergrid/services/AbstractService.java | 64 ++++++++++----------
1 file changed, 31 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b1393b4e/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java b/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
index 1c04da9..95a651d 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
@@ -28,6 +28,7 @@ import java.util.UUID;
import com.codahale.metrics.Timer;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.core.metrics.ObservableTimer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
@@ -447,45 +448,42 @@ public abstract class AbstractService implements Service {
* @param results
*/
private void importEntitiesParallel(final ServiceRequest request, final Results results ) {
- Timer.Context timer = entitiesParallelGetTimer.time();
- try {
- //create our tuples
- final Observable<EntityTuple> tuples = Observable.create(new Observable.OnSubscribe<EntityTuple>() {
- @Override
- public void call(final Subscriber<? super EntityTuple> subscriber) {
- subscriber.onStart();
-
- final List<Entity> entities = results.getEntities();
- final int size = entities.size();
- for (int i = 0; i < size && !subscriber.isUnsubscribed(); i++) {
- subscriber.onNext(new EntityTuple(i, entities.get(i)));
- }
-
- subscriber.onCompleted();
+ //create our tuples
+ final Observable<EntityTuple> tuples = Observable.create(new Observable.OnSubscribe<EntityTuple>() {
+ @Override
+ public void call(final Subscriber<? super EntityTuple> subscriber) {
+ subscriber.onStart();
+
+ final List<Entity> entities = results.getEntities();
+ final int size = entities.size();
+ for (int i = 0; i < size && !subscriber.isUnsubscribed(); i++) {
+ subscriber.onNext(new EntityTuple(i, entities.get(i)));
}
- });
- //now process them in parallel up to 10 threads
+ subscriber.onCompleted();
+ }
+ });
+
+ //now process them in parallel up to 10 threads
- tuples.flatMap(tuple -> {
- //map the entity into the tuple
- return Observable.just(tuple).doOnNext(parallelTuple -> {
- //import the entity and set it at index
- try {
+ Observable tuplesObservable = tuples.flatMap(tuple -> {
+ //map the entity into the tuple
+ return Observable.just(tuple).doOnNext(parallelTuple -> {
+ //import the entity and set it at index
+ try {
- final Entity imported = importEntity(request, parallelTuple.entity);
+ final Entity imported = importEntity(request, parallelTuple.entity);
- if (imported != null) {
- results.setEntity(parallelTuple.index, imported);
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
+ if (imported != null) {
+ results.setEntity(parallelTuple.index, imported);
}
- }).subscribeOn(rxScheduler);
- }, rxSchedulerFig.getImportThreads()).toBlocking().lastOrDefault(null);
- } finally {
- timer.stop();
- }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }).subscribeOn(rxScheduler);
+ }, rxSchedulerFig.getImportThreads());
+
+ ObservableTimer.time(tuplesObservable, entitiesParallelGetTimer).toBlocking().lastOrDefault(null);
}