You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by gr...@apache.org on 2015/03/18 21:56:09 UTC

[09/50] incubator-usergrid git commit: add timer and meter to collection

add timer and meter to collection


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/8ccc3ec5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/8ccc3ec5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/8ccc3ec5

Branch: refs/heads/USERGRID-460
Commit: 8ccc3ec5cf9a29381f2acdb63331f9de447dfd64
Parents: 06fe609
Author: Shawn Feldman <sf...@apache.org>
Authored: Wed Mar 11 09:04:41 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Wed Mar 11 09:04:41 2015 -0600

----------------------------------------------------------------------
 .../EntityCollectionManagerFactoryImpl.java     |   8 +-
 .../impl/EntityCollectionManagerImpl.java       | 120 ++++++++++++++-----
 2 files changed, 99 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8ccc3ec5/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
index 4e04c2e..bb1b56a 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
@@ -34,6 +34,7 @@ import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkCommit;
 import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkStart;
 import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
 import org.apache.usergrid.persistence.core.guice.ProxyImpl;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.core.task.TaskExecutor;
 
 import java.util.concurrent.ExecutionException;
@@ -80,6 +81,7 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag
     private final EntityDeletedFactory entityDeletedFactory;
     private final TaskExecutor taskExecutor;
     private final EntityCacheFig entityCacheFig;
+    private final MetricsFactory metricsFactory;
 
     private LoadingCache<CollectionScope, EntityCollectionManager> ecmCache =
         CacheBuilder.newBuilder().maximumSize( 1000 )
@@ -91,7 +93,7 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag
                                 writeOptimisticVerify, writeCommit, rollback, markStart, markCommit,
                                 entitySerializationStrategy, uniqueValueSerializationStrategy,
                                 mvccLogEntrySerializationStrategy, keyspace, entityVersionCleanupFactory,
-                                entityVersionCreatedFactory, entityDeletedFactory, taskExecutor, scope );
+                                entityVersionCreatedFactory, entityDeletedFactory, taskExecutor, scope, metricsFactory );
 
 
                             final EntityCollectionManager proxy = new CachedEntityCollectionManager(entityCacheFig, target  );
@@ -117,7 +119,8 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag
                                                final EntityVersionCreatedFactory entityVersionCreatedFactory,
                                                final EntityDeletedFactory entityDeletedFactory,
                                                @CollectionTaskExecutor final TaskExecutor taskExecutor,
-                                              final EntityCacheFig entityCacheFig) {
+                                              final EntityCacheFig entityCacheFig,
+                                               MetricsFactory metricsFactory) {
 
         this.writeStart = writeStart;
         this.writeUpdate = writeUpdate;
@@ -136,6 +139,7 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag
         this.entityDeletedFactory = entityDeletedFactory;
         this.taskExecutor = taskExecutor;
         this.entityCacheFig = entityCacheFig;
+        this.metricsFactory = metricsFactory;
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8ccc3ec5/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index 7c467c6..391e9a5 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@ -23,6 +23,9 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Timer;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -72,8 +75,10 @@ import org.apache.usergrid.persistence.collection.guice.CollectionTaskExecutor;
 import org.apache.usergrid.persistence.core.task.Task;
 import org.apache.usergrid.persistence.core.task.TaskExecutor;
 
+import rx.Notification;
 import rx.Observable;
 import rx.Subscriber;
+import rx.functions.Action0;
 import rx.functions.Action1;
 import rx.functions.Func1;
 import rx.schedulers.Schedulers;
@@ -113,6 +118,12 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
     private final TaskExecutor taskExecutor;
 
     private final Keyspace keyspace;
+    private final Timer writeTimer;
+    private final Meter writeMeter;
+    private final Timer deleteTimer;
+    private final Timer updateTimer;
+    private final Timer loadTimer;
+    private final Timer getLatestTimer;
 
 
     @Inject
@@ -133,7 +144,9 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
         final EntityVersionCreatedFactory          entityVersionCreatedFactory,
         final EntityDeletedFactory                 entityDeletedFactory,
         @CollectionTaskExecutor final TaskExecutor taskExecutor,
-        @Assisted final CollectionScope            collectionScope
+        @Assisted final CollectionScope            collectionScope,
+        final MetricsFactory metricsFactory
+
     ) {
         this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy;
         this.entitySerializationStrategy = entitySerializationStrategy;
@@ -160,6 +173,12 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
 
         this.collectionScope = collectionScope;
         this.mvccLogEntrySerializationStrategy = mvccLogEntrySerializationStrategy;
+        this.writeTimer = metricsFactory.getTimer(EntityCollectionManagerImpl.class,"write.timer");
+        this.writeMeter = metricsFactory.getMeter(EntityCollectionManagerImpl.class, "write.meter");
+        this.deleteTimer = metricsFactory.getTimer(EntityCollectionManagerImpl.class, "delete.timer");
+        this.updateTimer = metricsFactory.getTimer(EntityCollectionManagerImpl.class,"update.timer");
+        this.loadTimer = metricsFactory.getTimer(EntityCollectionManagerImpl.class,"load.timer");
+        this.getLatestTimer = metricsFactory.getTimer(EntityCollectionManagerImpl.class,"latest.timer");
     }
 
 
@@ -184,6 +203,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
         // observable = Concurrent.concurrent( observable, Schedulers.io(), new WaitZip(),
         //                  writeVerifyUnique, writeOptimisticVerify );
 
+        final Timer.Context timer = writeTimer.time();
         return observable.map(writeCommit).doOnNext(new Action1<Entity>() {
             @Override
             public void call(final Entity entity) {
@@ -192,7 +212,19 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
                 taskExecutor.submit(entityVersionCleanupFactory.getTask(collectionScope, entityId,entity.getVersion()));
                 //post-processing to come later. leave it empty for now.
             }
-        }).doOnError(rollback);
+        }).doOnError(rollback)
+            .doOnEach(new Action1<Notification<? super Entity>>() {
+                @Override
+                public void call(Notification<? super Entity> notification) {
+                    writeMeter.mark();
+                }
+            })
+            .doOnCompleted(new Action0() {
+                @Override
+                public void call() {
+                    timer.stop();
+                }
+            });
     }
 
 
@@ -203,21 +235,27 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
         Preconditions.checkNotNull( entityId.getUuid(), "Entity id is required in this stage" );
         Preconditions.checkNotNull( entityId.getType(), "Entity type is required in this stage" );
 
+        final Timer.Context timer = deleteTimer.time();
         Observable<Id> o = Observable.from(new CollectionIoEvent<Id>(collectionScope, entityId))
-            .map( markStart)
-            .doOnNext( markCommit)
-            .map( new Func1<CollectionIoEvent<MvccEntity>, Id>() {
-
+            .map(markStart)
+            .doOnNext(markCommit)
+            .map(new Func1<CollectionIoEvent<MvccEntity>, Id>() {
+
+                     @Override
+                     public Id call(final CollectionIoEvent<MvccEntity> mvccEntityCollectionIoEvent) {
+                         MvccEntity entity = mvccEntityCollectionIoEvent.getEvent();
+                         Task<Void> task = entityDeletedFactory
+                             .getTask(collectionScope, entity.getId(), entity.getVersion());
+                         taskExecutor.submit(task);
+                         return entity.getId();
+                     }
+                 }
+            ) .doOnCompleted(new Action0() {
                 @Override
-                public Id call(final CollectionIoEvent<MvccEntity> mvccEntityCollectionIoEvent) {
-                    MvccEntity entity = mvccEntityCollectionIoEvent.getEvent();
-                    Task<Void> task = entityDeletedFactory
-                        .getTask( collectionScope, entity.getId(), entity.getVersion());
-                    taskExecutor.submit(task);
-                    return entity.getId();
+                public void call() {
+                    timer.stop();
                 }
-            }
-        );
+            });;
 
         return o;
     }
@@ -230,18 +268,25 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
         Preconditions.checkNotNull( entityId.getUuid(), "Entity id uuid required in load stage" );
         Preconditions.checkNotNull( entityId.getType(), "Entity id type required in load stage" );
 
-        return load( Collections.singleton( entityId ) ).flatMap( new Func1<EntitySet, Observable<Entity>>() {
+        final Timer.Context timer = loadTimer.time();
+        return load( Collections.singleton( entityId ) ).flatMap(new Func1<EntitySet, Observable<Entity>>() {
             @Override
-            public Observable<Entity> call( final EntitySet entitySet ) {
-                final MvccEntity entity = entitySet.getEntity( entityId );
+            public Observable<Entity> call(final EntitySet entitySet) {
+                final MvccEntity entity = entitySet.getEntity(entityId);
 
-                if ( entity == null || !entity.getEntity().isPresent() ) {
+                if (entity == null || !entity.getEntity().isPresent()) {
                     return Observable.empty();
                 }
 
-                return Observable.from( entity.getEntity().get() );
+                return Observable.from(entity.getEntity().get());
             }
-        } );
+        })
+            .doOnCompleted(new Action0() {
+                @Override
+                public void call() {
+                    timer.stop();
+                }
+            });
     }
 
 
@@ -253,6 +298,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
 
         Preconditions.checkNotNull( entityIds, "entityIds cannot be null" );
 
+        final Timer.Context timer = loadTimer.time();
 
         return Observable.create( new Observable.OnSubscribe<EntitySet>() {
 
@@ -269,7 +315,13 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
                     subscriber.onError( e );
                 }
             }
-        } );
+        } )
+            .doOnCompleted(new Action0() {
+                @Override
+                public void call() {
+                    timer.stop();
+                }
+            });
     }
 
 
@@ -314,13 +366,14 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
         Observable<CollectionIoEvent<MvccEntity>> observable = stageRunner( writeData, writeUpdate );
 
 
-        return observable.map( writeCommit ).doOnNext( new Action1<Entity>() {
+        final Timer.Context timer = updateTimer.time();
+        return observable.map( writeCommit ).doOnNext(new Action1<Entity>() {
             @Override
-            public void call( final Entity entity ) {
-                logger.debug( "sending entity to the queue" );
+            public void call(final Entity entity) {
+                logger.debug("sending entity to the queue");
 
                 //we an update, signal the fix
-                taskExecutor.submit(entityVersionCreatedFactory.getTask(collectionScope,entity));
+                taskExecutor.submit(entityVersionCreatedFactory.getTask(collectionScope, entity));
 
                 //TODO T.N Change this to fire a task
                 //                Observable.from( new CollectionIoEvent<Id>(collectionScope,
@@ -328,7 +381,13 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
 
 
             }
-        } ).doOnError( rollback );
+        }).doOnError(rollback)
+            .doOnCompleted(new Action0() {
+                @Override
+                public void call() {
+                    timer.stop();
+                }
+            });
     }
 
 
@@ -362,6 +421,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
     @Override
     public Observable<VersionSet> getLatestVersion( final Collection<Id> entityIds ) {
 
+        final Timer.Context timer = getLatestTimer.time();
         return Observable.create( new Observable.OnSubscribe<VersionSet>() {
 
             @Override
@@ -377,7 +437,13 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
                     subscriber.onError( e );
                 }
             }
-        } );
+        } )
+            .doOnCompleted(new Action0() {
+                @Override
+                public void call() {
+                    timer.stop();
+                }
+            });
     }