You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/03/11 16:04:45 UTC
incubator-usergrid git commit: add timer and meter to collection
Repository: incubator-usergrid
Updated Branches:
refs/heads/USERGRID-466 06fe6097c -> 8ccc3ec5c
add timer and meter to collection
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/8ccc3ec5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/8ccc3ec5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/8ccc3ec5
Branch: refs/heads/USERGRID-466
Commit: 8ccc3ec5cf9a29381f2acdb63331f9de447dfd64
Parents: 06fe609
Author: Shawn Feldman <sf...@apache.org>
Authored: Wed Mar 11 09:04:41 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Wed Mar 11 09:04:41 2015 -0600
----------------------------------------------------------------------
.../EntityCollectionManagerFactoryImpl.java | 8 +-
.../impl/EntityCollectionManagerImpl.java | 120 ++++++++++++++-----
2 files changed, 99 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8ccc3ec5/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
index 4e04c2e..bb1b56a 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
@@ -34,6 +34,7 @@ import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkCommit;
import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkStart;
import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
import org.apache.usergrid.persistence.core.guice.ProxyImpl;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.core.task.TaskExecutor;
import java.util.concurrent.ExecutionException;
@@ -80,6 +81,7 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag
private final EntityDeletedFactory entityDeletedFactory;
private final TaskExecutor taskExecutor;
private final EntityCacheFig entityCacheFig;
+ private final MetricsFactory metricsFactory;
private LoadingCache<CollectionScope, EntityCollectionManager> ecmCache =
CacheBuilder.newBuilder().maximumSize( 1000 )
@@ -91,7 +93,7 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag
writeOptimisticVerify, writeCommit, rollback, markStart, markCommit,
entitySerializationStrategy, uniqueValueSerializationStrategy,
mvccLogEntrySerializationStrategy, keyspace, entityVersionCleanupFactory,
- entityVersionCreatedFactory, entityDeletedFactory, taskExecutor, scope );
+ entityVersionCreatedFactory, entityDeletedFactory, taskExecutor, scope, metricsFactory );
final EntityCollectionManager proxy = new CachedEntityCollectionManager(entityCacheFig, target );
@@ -117,7 +119,8 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag
final EntityVersionCreatedFactory entityVersionCreatedFactory,
final EntityDeletedFactory entityDeletedFactory,
@CollectionTaskExecutor final TaskExecutor taskExecutor,
- final EntityCacheFig entityCacheFig) {
+ final EntityCacheFig entityCacheFig,
+ MetricsFactory metricsFactory) {
this.writeStart = writeStart;
this.writeUpdate = writeUpdate;
@@ -136,6 +139,7 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag
this.entityDeletedFactory = entityDeletedFactory;
this.taskExecutor = taskExecutor;
this.entityCacheFig = entityCacheFig;
+ this.metricsFactory = metricsFactory;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8ccc3ec5/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index 7c467c6..391e9a5 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@ -23,6 +23,9 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Timer;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -72,8 +75,10 @@ import org.apache.usergrid.persistence.collection.guice.CollectionTaskExecutor;
import org.apache.usergrid.persistence.core.task.Task;
import org.apache.usergrid.persistence.core.task.TaskExecutor;
+import rx.Notification;
import rx.Observable;
import rx.Subscriber;
+import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
@@ -113,6 +118,12 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
private final TaskExecutor taskExecutor;
private final Keyspace keyspace;
+ private final Timer writeTimer;
+ private final Meter writeMeter;
+ private final Timer deleteTimer;
+ private final Timer updateTimer;
+ private final Timer loadTimer;
+ private final Timer getLatestTimer;
@Inject
@@ -133,7 +144,9 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
final EntityVersionCreatedFactory entityVersionCreatedFactory,
final EntityDeletedFactory entityDeletedFactory,
@CollectionTaskExecutor final TaskExecutor taskExecutor,
- @Assisted final CollectionScope collectionScope
+ @Assisted final CollectionScope collectionScope,
+ final MetricsFactory metricsFactory
+
) {
this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy;
this.entitySerializationStrategy = entitySerializationStrategy;
@@ -160,6 +173,12 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
this.collectionScope = collectionScope;
this.mvccLogEntrySerializationStrategy = mvccLogEntrySerializationStrategy;
+ this.writeTimer = metricsFactory.getTimer(EntityCollectionManagerImpl.class,"write.timer");
+ this.writeMeter = metricsFactory.getMeter(EntityCollectionManagerImpl.class, "write.meter");
+ this.deleteTimer = metricsFactory.getTimer(EntityCollectionManagerImpl.class, "delete.timer");
+ this.updateTimer = metricsFactory.getTimer(EntityCollectionManagerImpl.class,"update.timer");
+ this.loadTimer = metricsFactory.getTimer(EntityCollectionManagerImpl.class,"load.timer");
+ this.getLatestTimer = metricsFactory.getTimer(EntityCollectionManagerImpl.class,"latest.timer");
}
@@ -184,6 +203,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
// observable = Concurrent.concurrent( observable, Schedulers.io(), new WaitZip(),
// writeVerifyUnique, writeOptimisticVerify );
+ final Timer.Context timer = writeTimer.time();
return observable.map(writeCommit).doOnNext(new Action1<Entity>() {
@Override
public void call(final Entity entity) {
@@ -192,7 +212,19 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
taskExecutor.submit(entityVersionCleanupFactory.getTask(collectionScope, entityId,entity.getVersion()));
//post-processing to come later. leave it empty for now.
}
- }).doOnError(rollback);
+ }).doOnError(rollback)
+ .doOnEach(new Action1<Notification<? super Entity>>() {
+ @Override
+ public void call(Notification<? super Entity> notification) {
+ writeMeter.mark();
+ }
+ })
+ .doOnCompleted(new Action0() {
+ @Override
+ public void call() {
+ timer.stop();
+ }
+ });
}
@@ -203,21 +235,27 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
Preconditions.checkNotNull( entityId.getUuid(), "Entity id is required in this stage" );
Preconditions.checkNotNull( entityId.getType(), "Entity type is required in this stage" );
+ final Timer.Context timer = deleteTimer.time();
Observable<Id> o = Observable.from(new CollectionIoEvent<Id>(collectionScope, entityId))
- .map( markStart)
- .doOnNext( markCommit)
- .map( new Func1<CollectionIoEvent<MvccEntity>, Id>() {
-
+ .map(markStart)
+ .doOnNext(markCommit)
+ .map(new Func1<CollectionIoEvent<MvccEntity>, Id>() {
+
+ @Override
+ public Id call(final CollectionIoEvent<MvccEntity> mvccEntityCollectionIoEvent) {
+ MvccEntity entity = mvccEntityCollectionIoEvent.getEvent();
+ Task<Void> task = entityDeletedFactory
+ .getTask(collectionScope, entity.getId(), entity.getVersion());
+ taskExecutor.submit(task);
+ return entity.getId();
+ }
+ }
+ ) .doOnCompleted(new Action0() {
@Override
- public Id call(final CollectionIoEvent<MvccEntity> mvccEntityCollectionIoEvent) {
- MvccEntity entity = mvccEntityCollectionIoEvent.getEvent();
- Task<Void> task = entityDeletedFactory
- .getTask( collectionScope, entity.getId(), entity.getVersion());
- taskExecutor.submit(task);
- return entity.getId();
+ public void call() {
+ timer.stop();
}
- }
- );
+ });;
return o;
}
@@ -230,18 +268,25 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
Preconditions.checkNotNull( entityId.getUuid(), "Entity id uuid required in load stage" );
Preconditions.checkNotNull( entityId.getType(), "Entity id type required in load stage" );
- return load( Collections.singleton( entityId ) ).flatMap( new Func1<EntitySet, Observable<Entity>>() {
+ final Timer.Context timer = loadTimer.time();
+ return load( Collections.singleton( entityId ) ).flatMap(new Func1<EntitySet, Observable<Entity>>() {
@Override
- public Observable<Entity> call( final EntitySet entitySet ) {
- final MvccEntity entity = entitySet.getEntity( entityId );
+ public Observable<Entity> call(final EntitySet entitySet) {
+ final MvccEntity entity = entitySet.getEntity(entityId);
- if ( entity == null || !entity.getEntity().isPresent() ) {
+ if (entity == null || !entity.getEntity().isPresent()) {
return Observable.empty();
}
- return Observable.from( entity.getEntity().get() );
+ return Observable.from(entity.getEntity().get());
}
- } );
+ })
+ .doOnCompleted(new Action0() {
+ @Override
+ public void call() {
+ timer.stop();
+ }
+ });
}
@@ -253,6 +298,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
Preconditions.checkNotNull( entityIds, "entityIds cannot be null" );
+ final Timer.Context timer = loadTimer.time();
return Observable.create( new Observable.OnSubscribe<EntitySet>() {
@@ -269,7 +315,13 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
subscriber.onError( e );
}
}
- } );
+ } )
+ .doOnCompleted(new Action0() {
+ @Override
+ public void call() {
+ timer.stop();
+ }
+ });
}
@@ -314,13 +366,14 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
Observable<CollectionIoEvent<MvccEntity>> observable = stageRunner( writeData, writeUpdate );
- return observable.map( writeCommit ).doOnNext( new Action1<Entity>() {
+ final Timer.Context timer = updateTimer.time();
+ return observable.map( writeCommit ).doOnNext(new Action1<Entity>() {
@Override
- public void call( final Entity entity ) {
- logger.debug( "sending entity to the queue" );
+ public void call(final Entity entity) {
+ logger.debug("sending entity to the queue");
//we an update, signal the fix
- taskExecutor.submit(entityVersionCreatedFactory.getTask(collectionScope,entity));
+ taskExecutor.submit(entityVersionCreatedFactory.getTask(collectionScope, entity));
//TODO T.N Change this to fire a task
// Observable.from( new CollectionIoEvent<Id>(collectionScope,
@@ -328,7 +381,13 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
}
- } ).doOnError( rollback );
+ }).doOnError(rollback)
+ .doOnCompleted(new Action0() {
+ @Override
+ public void call() {
+ timer.stop();
+ }
+ });
}
@@ -362,6 +421,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
@Override
public Observable<VersionSet> getLatestVersion( final Collection<Id> entityIds ) {
+ final Timer.Context timer = getLatestTimer.time();
return Observable.create( new Observable.OnSubscribe<VersionSet>() {
@Override
@@ -377,7 +437,13 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
subscriber.onError( e );
}
}
- } );
+ } )
+ .doOnCompleted(new Action0() {
+ @Override
+ public void call() {
+ timer.stop();
+ }
+ });
}