You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by gr...@apache.org on 2015/03/18 21:56:07 UTC

[07/50] incubator-usergrid git commit: graphtimers

graphtimers


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/56c57018
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/56c57018
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/56c57018

Branch: refs/heads/USERGRID-460
Commit: 56c57018d3b4e8eb391e59bbecffb09fb6c1f1ea
Parents: b3e42dd
Author: Shawn Feldman <sf...@apache.org>
Authored: Wed Mar 11 08:36:23 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Wed Mar 11 08:36:23 2015 -0600

----------------------------------------------------------------------
 stack/corepersistence/graph/pom.xml             |  28 +-
 .../graph/impl/GraphManagerImpl.java            | 288 +++++++++++++++++--
 stack/pom.xml                                   |   1 -
 3 files changed, 273 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c57018/stack/corepersistence/graph/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/pom.xml b/stack/corepersistence/graph/pom.xml
index e448e74..b4cb45f 100644
--- a/stack/corepersistence/graph/pom.xml
+++ b/stack/corepersistence/graph/pom.xml
@@ -66,13 +66,17 @@
     </dependency>
 
 
-
-    <dependency>
-        <groupId>com.codahale.metrics</groupId>
-        <artifactId>metrics-core</artifactId>
-        <version>3.0.2</version>
-        <scope>test</scope>
-    </dependency>
+      <dependency>
+          <groupId>com.codahale.metrics</groupId>
+          <artifactId>metrics-core</artifactId>
+          <version>${metrics.version}</version>
+      </dependency>
+
+      <dependency>
+          <groupId>com.codahale.metrics</groupId>
+          <artifactId>metrics-graphite</artifactId>
+          <version>${metrics.version}</version>
+      </dependency>
 
     <dependency>
       <groupId>org.mockito</groupId>
@@ -88,16 +92,16 @@
 
   <build>
     <plugins>
-        
+
 <!--      <plugin>
         <groupId>org.safehaus.chop</groupId>
         <artifactId>chop-maven-plugin</artifactId>
         <version>${chop.version}</version>
 
-        
+
         NOTE: you should be putting most of these variables into your settings.xml
         as an automatically activated profile.
-        
+
 
         <configuration>
           <accessKey>${aws.s3.key}</accessKey>
@@ -119,11 +123,11 @@
           <runnerCount>6</runnerCount>
           <runnerName>${runner.name}</runnerName>
           <securityGroupExceptions>
-            
+
             Add your own IP address as an exception to allow access
             but please do this in the settings.xml file .. essentially
             all parameters should be in the settings.xml file.
-            
+
             <param>${myip.address}/32:24981</param>
             <param>${myip.address}/32:22</param>
           </securityGroupExceptions>

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c57018/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
index df10816..53e116d 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
@@ -25,6 +25,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Timer;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -56,9 +59,12 @@ import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
 import com.netflix.astyanax.MutationBatch;
 
+import rx.Notification;
 import rx.Observable;
 import rx.Observer;
 import rx.Subscriber;
+import rx.functions.Action0;
+import rx.functions.Action1;
 import rx.functions.Func1;
 import rx.schedulers.Schedulers;
 
@@ -81,6 +87,30 @@ public class GraphManagerImpl implements GraphManager {
 
     private final EdgeDeleteListener edgeDeleteListener;
     private final NodeDeleteListener nodeDeleteListener;
+    private final Timer writeEdgeTimer;
+    private final Meter writeEdgeMeter;
+    private final Meter deleteEdgeMeter;
+    private final Timer deleteEdgeTimer;
+    private final Meter deleteNodeMeter;
+    private final Timer deleteNodeTimer;
+    private final Meter loadEdgesFromSourceMeter;
+    private final Timer loadEdgesFromSourceTimer;
+    private final Meter loadEdgesToTargetMeter;
+    private final Timer loadEdgesToTargetTimer;
+    private final Meter loadEdgesVersionsMeter;
+    private final Timer loadEdgesVersionsTimer;
+    private final Meter loadEdgesFromSourceByTypeMeter;
+    private final Timer loadEdgesFromSourceByTypeTimer;
+    private final Meter loadEdgesToTargetByTypeMeter;
+    private final Timer loadEdgesToTargetByTypeTimer;
+    private final Timer getEdgeTypesFromSourceTimer;
+    private final Meter getEdgeTypesFromSourceMeter;
+    private final Timer getIdTypesFromSourceTimer;
+    private final Meter getIdTypesFromSourceMeter;
+    private final Meter getEdgeTypesToTargetMeter;
+    private final Timer getEdgeTypesToTargetTimer;
+    private final Timer getIdTypesToTargetTimer;
+    private final Meter getIdTypesToTargetMeter;
 
     private Observer<Integer> edgeDeleteSubcriber;
     private Observer<Integer> nodeDelete;
@@ -96,7 +126,8 @@ public class GraphManagerImpl implements GraphManager {
                              final GraphFig graphFig,
                              final EdgeDeleteListener edgeDeleteListener,
                              final NodeDeleteListener nodeDeleteListener,
-                             @Assisted final ApplicationScope scope) {
+                             @Assisted final ApplicationScope scope,
+                             MetricsFactory metricsFactory) {
 
 
         ValidationUtils.validateApplicationScope( scope );
@@ -117,6 +148,36 @@ public class GraphManagerImpl implements GraphManager {
 
         this.edgeDeleteSubcriber = MetricSubscriber.INSTANCE;
         this.nodeDelete = MetricSubscriber.INSTANCE;
+        this.writeEdgeMeter = metricsFactory.getMeter(GraphManagerImpl.class, "write.edge.meter");
+        this.writeEdgeTimer = metricsFactory.getTimer(GraphManagerImpl.class, "write.edge.timer");
+        this.deleteEdgeMeter = metricsFactory.getMeter(GraphManagerImpl.class, "delete.edge.meter");
+        this.deleteEdgeTimer = metricsFactory.getTimer(GraphManagerImpl.class, "delete.edge.timer");
+        this.deleteNodeMeter = metricsFactory.getMeter(GraphManagerImpl.class, "delete.node.meter");
+        this.deleteNodeTimer = metricsFactory.getTimer(GraphManagerImpl.class, "delete.node.timer");
+        this.loadEdgesFromSourceMeter = metricsFactory.getMeter(GraphManagerImpl.class, "load.from.meter");
+        this.loadEdgesFromSourceTimer = metricsFactory.getTimer(GraphManagerImpl.class, "load.from.timer");
+        this.loadEdgesToTargetMeter = metricsFactory.getMeter(GraphManagerImpl.class, "load.to.meter");
+        this.loadEdgesToTargetTimer = metricsFactory.getTimer(GraphManagerImpl.class, "load.to.timer");
+        this.loadEdgesVersionsMeter = metricsFactory.getMeter(GraphManagerImpl.class, "load.versions.meter");
+        this.loadEdgesVersionsTimer = metricsFactory.getTimer(GraphManagerImpl.class,"load.versions.timer");
+        this.loadEdgesFromSourceByTypeMeter = metricsFactory.getMeter(GraphManagerImpl.class, "load.from.type.meter");
+        this.loadEdgesFromSourceByTypeTimer = metricsFactory.getTimer(GraphManagerImpl.class, "load.from.type.timer");
+        this.loadEdgesToTargetByTypeMeter = metricsFactory.getMeter(GraphManagerImpl.class, "load.to.type.meter");
+        this.loadEdgesToTargetByTypeTimer = metricsFactory.getTimer(GraphManagerImpl.class, "load.to.type.timer");
+
+        this.getEdgeTypesFromSourceTimer = metricsFactory.getTimer(GraphManagerImpl.class,"get.edge.from.timer");
+        this.getEdgeTypesFromSourceMeter = metricsFactory.getMeter(GraphManagerImpl.class, "write.edge.meter");
+
+        this.getIdTypesFromSourceTimer = metricsFactory.getTimer(GraphManagerImpl.class,"get.idtype.from.timer");
+        this.getIdTypesFromSourceMeter = metricsFactory.getMeter(GraphManagerImpl.class, "get.idtype.from.meter");
+
+        this.getEdgeTypesToTargetTimer = metricsFactory.getTimer(GraphManagerImpl.class,"get.edge.to.timer");
+        this.getEdgeTypesToTargetMeter = metricsFactory.getMeter(GraphManagerImpl.class, "get.edge.to.meter");
+
+        this.getIdTypesToTargetTimer = metricsFactory.getTimer(GraphManagerImpl.class, "get.idtype.to.timer");
+        this.getIdTypesToTargetMeter = metricsFactory.getMeter(GraphManagerImpl.class, "get.idtype.to.meter");
+
+
     }
 
 
@@ -125,6 +186,8 @@ public class GraphManagerImpl implements GraphManager {
         GraphValidation.validateEdge( edge );
 
         final MarkedEdge markedEdge = new SimpleMarkedEdge( edge, false );
+        final Timer.Context timer = writeEdgeTimer.time();
+        final Meter meter = writeEdgeMeter;
 
         return Observable.from( markedEdge ).map( new Func1<MarkedEdge, Edge>() {
             @Override
@@ -143,45 +206,72 @@ public class GraphManagerImpl implements GraphManager {
 
                 return edge;
             }
-        } );
+        } )
+            .doOnEach(new Action1<Notification<? super Edge>>() {
+                @Override
+                public void call(Notification<? super Edge> notification) {
+                    meter.mark();
+                }
+            })
+            .doOnCompleted(new Action0() {
+                @Override
+                public void call() {
+                    timer.stop();
+                }
+            });
     }
 
 
     @Override
     public Observable<Edge> deleteEdge( final Edge edge ) {
-        GraphValidation.validateEdge( edge );
+        GraphValidation.validateEdge(edge);
 
-        final MarkedEdge markedEdge = new SimpleMarkedEdge( edge, true );
+        final MarkedEdge markedEdge = new SimpleMarkedEdge(edge, true);
 
-
-        return Observable.from( markedEdge ).map( new Func1<MarkedEdge, Edge>() {
+        final Timer.Context timer = deleteEdgeTimer.time();
+        final Meter meter = deleteEdgeMeter;
+        return Observable.from(markedEdge).map(new Func1<MarkedEdge, Edge>() {
             @Override
-            public Edge call( final MarkedEdge edge ) {
+            public Edge call(final MarkedEdge edge) {
 
                 final UUID timestamp = UUIDGenerator.newTimeUUID();
 
 
-                final MutationBatch edgeMutation = storageEdgeSerialization.writeEdge( scope, edge, timestamp );
+                final MutationBatch edgeMutation = storageEdgeSerialization.writeEdge(scope, edge, timestamp);
 
 
-                LOG.debug( "Marking edge {} as deleted to commit log", edge );
-                HystrixCassandra.user( edgeMutation );
+                LOG.debug("Marking edge {} as deleted to commit log", edge);
+                HystrixCassandra.user(edgeMutation);
 
 
                 //HystrixCassandra.async( edgeDeleteListener.receive( scope, markedEdge,
                 // timestamp )).subscribeOn( Schedulers.io() ).subscribe( edgeDeleteSubcriber );
-                edgeDeleteListener.receive( scope, markedEdge, timestamp ).subscribeOn( Schedulers.io() )
-                                  .subscribe( edgeDeleteSubcriber );
+                edgeDeleteListener.receive(scope, markedEdge, timestamp).subscribeOn(Schedulers.io())
+                    .subscribe(edgeDeleteSubcriber);
 
 
                 return edge;
             }
-        } );
+        })
+            .doOnEach(new Action1<Notification<? super Edge>>() {
+                @Override
+                public void call(Notification<? super Edge> notification) {
+                    meter.mark();
+                }
+            })
+            .doOnCompleted(new Action0() {
+                @Override
+                public void call() {
+                    timer.stop();
+                }
+            });
     }
 
 
     @Override
     public Observable<Id> deleteNode( final Id node, final long timestamp ) {
+        final Timer.Context timer = deleteNodeTimer.time();
+        final Meter meter = deleteNodeMeter;
         return Observable.from( node ).map( new Func1<Id, Id>() {
             @Override
             public Id call( final Id id ) {
@@ -205,114 +295,250 @@ public class GraphManagerImpl implements GraphManager {
 
                 return id;
             }
-        } );
+        } )
+            .doOnEach(new Action1<Notification<? super Id>>() {
+                @Override
+                public void call(Notification<? super Id> notification) {
+                    meter.mark();
+                }
+            })
+            .doOnCompleted(new Action0() {
+                @Override
+                public void call() {
+                    timer.stop();
+                }
+            });
     }
 
 
     @Override
     public Observable<Edge> loadEdgeVersions( final SearchByEdge searchByEdge ) {
+        final Timer.Context timer = loadEdgesVersionsTimer.time();
+        final Meter meter = loadEdgesVersionsMeter;
         return Observable.create( new ObservableIterator<MarkedEdge>( "getEdgeTypesFromSource" ) {
             @Override
             protected Iterator<MarkedEdge> getIterator() {
                 return storageEdgeSerialization.getEdgeVersions( scope, searchByEdge );
             }
-        } ).buffer( graphFig.getScanPageSize() ).flatMap( new EdgeBufferFilter( searchByEdge.getMaxTimestamp() ) )
-                         .cast( Edge.class );
+        } ).buffer( graphFig.getScanPageSize() ).flatMap(new EdgeBufferFilter(searchByEdge.getMaxTimestamp()))
+                         .cast(Edge.class)
+            .doOnEach(new Action1<Notification<? super Edge>>() {
+                @Override
+                public void call(Notification<? super Edge> notification) {
+                    meter.mark();
+                }
+            })
+            .doOnCompleted(new Action0() {
+                @Override
+                public void call() {
+                    timer.stop();
+                }
+            });
     }
 
 
     @Override
     public Observable<Edge> loadEdgesFromSource( final SearchByEdgeType search ) {
+        final Timer.Context timer = loadEdgesFromSourceTimer.time();
+        final Meter meter = loadEdgesFromSourceMeter;
         return Observable.create( new ObservableIterator<MarkedEdge>( "getEdgeTypesFromSource" ) {
             @Override
             protected Iterator<MarkedEdge> getIterator() {
                 return storageEdgeSerialization.getEdgesFromSource( scope, search );
             }
-        } ).buffer( graphFig.getScanPageSize() ).flatMap( new EdgeBufferFilter( search.getMaxTimestamp() ) )
-                         .cast( Edge.class );
+        } ).buffer( graphFig.getScanPageSize() ).flatMap(new EdgeBufferFilter(search.getMaxTimestamp()))
+                         .cast(Edge.class)
+            .doOnEach(new Action1<Notification<? super Edge>>() {
+                @Override
+                public void call(Notification<? super Edge> notification) {
+                    meter.mark();
+                }
+            })
+            .doOnCompleted(new Action0() {
+                @Override
+                public void call() {
+                    timer.stop();
+                }
+            });
     }
 
 
     @Override
     public Observable<Edge> loadEdgesToTarget( final SearchByEdgeType search ) {
+        final Timer.Context timer = loadEdgesToTargetTimer.time();
+        final Meter meter = loadEdgesToTargetMeter;
         return Observable.create( new ObservableIterator<MarkedEdge>( "getEdgeTypesFromSource" ) {
             @Override
             protected Iterator<MarkedEdge> getIterator() {
                 return storageEdgeSerialization.getEdgesToTarget( scope, search );
             }
-        } ).buffer( graphFig.getScanPageSize() ).flatMap( new EdgeBufferFilter( search.getMaxTimestamp() ) )
-                         .cast( Edge.class );
+        } ).buffer( graphFig.getScanPageSize() ).flatMap(new EdgeBufferFilter(search.getMaxTimestamp()))
+                         .cast(Edge.class)
+            .doOnEach(new Action1<Notification<? super Edge>>() {
+                @Override
+                public void call(Notification<? super Edge> notification) {
+                    meter.mark();
+                }
+            })
+            .doOnCompleted(new Action0() {
+                @Override
+                public void call() {
+                    timer.stop();
+                }
+            });
     }
 
 
     @Override
     public Observable<Edge> loadEdgesFromSourceByType( final SearchByIdType search ) {
+        final Timer.Context timer = loadEdgesFromSourceByTypeTimer.time();
+        final Meter meter = loadEdgesFromSourceByTypeMeter;
         return Observable.create( new ObservableIterator<MarkedEdge>( "getEdgeTypesFromSource" ) {
             @Override
             protected Iterator<MarkedEdge> getIterator() {
                 return storageEdgeSerialization.getEdgesFromSourceByTargetType( scope, search );
             }
-        } ).buffer( graphFig.getScanPageSize() ).flatMap( new EdgeBufferFilter( search.getMaxTimestamp() ) )
-
-                         .cast( Edge.class );
+        } ).buffer( graphFig.getScanPageSize() ).flatMap(new EdgeBufferFilter(search.getMaxTimestamp()))
+
+                         .cast(Edge.class)
+            .doOnEach(new Action1<Notification<? super Edge>>() {
+                @Override
+                public void call(Notification<? super Edge> notification) {
+                    meter.mark();
+                }
+            })
+            .doOnCompleted(new Action0() {
+                @Override
+                public void call() {
+                    timer.stop();
+                }
+            });
     }
 
 
     @Override
     public Observable<Edge> loadEdgesToTargetByType( final SearchByIdType search ) {
+        final Timer.Context timer = loadEdgesToTargetByTypeTimer.time();
+        final Meter meter = loadEdgesToTargetByTypeMeter;
         return Observable.create( new ObservableIterator<MarkedEdge>( "getEdgeTypesFromSource" ) {
             @Override
             protected Iterator<MarkedEdge> getIterator() {
                 return storageEdgeSerialization.getEdgesToTargetBySourceType( scope, search );
             }
-        } ).buffer( graphFig.getScanPageSize() ).flatMap( new EdgeBufferFilter( search.getMaxTimestamp() ) )
-                         .cast( Edge.class );
+        } ).buffer( graphFig.getScanPageSize() ).flatMap(new EdgeBufferFilter(search.getMaxTimestamp()))
+                         .cast(Edge.class)
+            .doOnEach(new Action1<Notification<? super Edge>>() {
+                @Override
+                public void call(Notification<? super Edge> notification) {
+                    meter.mark();
+                }
+            })
+            .doOnCompleted(new Action0() {
+                @Override
+                public void call() {
+                    timer.stop();
+                }
+            });
     }
 
 
     @Override
     public Observable<String> getEdgeTypesFromSource( final SearchEdgeType search ) {
-
+        final Timer.Context timer = getEdgeTypesFromSourceTimer.time();
+        final Meter meter = getEdgeTypesFromSourceMeter;
         return Observable.create( new ObservableIterator<String>( "getEdgeTypesFromSource" ) {
             @Override
             protected Iterator<String> getIterator() {
                 return edgeMetadataSerialization.getEdgeTypesFromSource( scope, search );
             }
-        } );
+        } )
+            .doOnEach(new Action1<Notification<? super String>>() {
+                @Override
+                public void call(Notification<? super String> notification) {
+                    meter.mark();
+                }
+            })
+            .doOnCompleted(new Action0() {
+                @Override
+                public void call() {
+                    timer.stop();
+                }
+            });
     }
 
 
     @Override
     public Observable<String> getIdTypesFromSource( final SearchIdType search ) {
+        final Timer.Context timer = getIdTypesFromSourceTimer.time();
+        final Meter meter = getIdTypesFromSourceMeter;
         return Observable.create( new ObservableIterator<String>( "getIdTypesFromSource" ) {
             @Override
             protected Iterator<String> getIterator() {
                 return edgeMetadataSerialization.getIdTypesFromSource( scope, search );
             }
-        } );
+        } )
+            .doOnEach(new Action1<Notification<? super String>>() {
+                @Override
+                public void call(Notification<? super String> notification) {
+                    meter.mark();
+                }
+            })
+            .doOnCompleted(new Action0() {
+                @Override
+                public void call() {
+                    timer.stop();
+                }
+            });
     }
 
 
     @Override
     public Observable<String> getEdgeTypesToTarget( final SearchEdgeType search ) {
-
+        final Timer.Context timer = getEdgeTypesToTargetTimer.time();
+        final Meter meter = getEdgeTypesToTargetMeter;
         return Observable.create( new ObservableIterator<String>( "getEdgeTypesToTarget" ) {
             @Override
             protected Iterator<String> getIterator() {
                 return edgeMetadataSerialization.getEdgeTypesToTarget( scope, search );
             }
-        } );
+        } )
+            .doOnEach(new Action1<Notification<? super String>>() {
+                @Override
+                public void call(Notification<? super String> notification) {
+                    meter.mark();
+                }
+            })
+            .doOnCompleted(new Action0() {
+                @Override
+                public void call() {
+                    timer.stop();
+                }
+            });
     }
 
 
     @Override
     public Observable<String> getIdTypesToTarget( final SearchIdType search ) {
+        final Timer.Context timer = getIdTypesToTargetTimer.time();
+        final Meter meter = getIdTypesToTargetMeter;
         return Observable.create( new ObservableIterator<String>( "getIdTypesToTarget" ) {
             @Override
             protected Iterator<String> getIterator() {
                 return edgeMetadataSerialization.getIdTypesToTarget( scope, search );
             }
-        } );
+        } )
+            .doOnEach(new Action1<Notification<? super String>>() {
+                @Override
+                public void call(Notification<? super String> notification) {
+                    meter.mark();
+                }
+            })
+            .doOnCompleted(new Action0() {
+                @Override
+                public void call() {
+                    timer.stop();
+                }
+            });
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c57018/stack/pom.xml
----------------------------------------------------------------------
diff --git a/stack/pom.xml b/stack/pom.xml
index 25b2258..73f2cbd 100644
--- a/stack/pom.xml
+++ b/stack/pom.xml
@@ -109,7 +109,6 @@
       <jersey-version>1.18.1</jersey-version>
       <junit-version>4.12</junit-version>
       <log4j-version>1.2.16</log4j-version>
-      <metrics-version>2.1.2</metrics-version>
       <org.springframework.version>3.1.2.RELEASE</org.springframework.version>
       <shiro-version>1.2.3</shiro-version>
       <slf4j-version>1.6.1</slf4j-version>