You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by gr...@apache.org on 2015/03/18 21:56:07 UTC
[07/50] incubator-usergrid git commit: graphtimers
graphtimers
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/56c57018
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/56c57018
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/56c57018
Branch: refs/heads/USERGRID-460
Commit: 56c57018d3b4e8eb391e59bbecffb09fb6c1f1ea
Parents: b3e42dd
Author: Shawn Feldman <sf...@apache.org>
Authored: Wed Mar 11 08:36:23 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Wed Mar 11 08:36:23 2015 -0600
----------------------------------------------------------------------
stack/corepersistence/graph/pom.xml | 28 +-
.../graph/impl/GraphManagerImpl.java | 288 +++++++++++++++++--
stack/pom.xml | 1 -
3 files changed, 273 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c57018/stack/corepersistence/graph/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/pom.xml b/stack/corepersistence/graph/pom.xml
index e448e74..b4cb45f 100644
--- a/stack/corepersistence/graph/pom.xml
+++ b/stack/corepersistence/graph/pom.xml
@@ -66,13 +66,17 @@
</dependency>
-
- <dependency>
- <groupId>com.codahale.metrics</groupId>
- <artifactId>metrics-core</artifactId>
- <version>3.0.2</version>
- <scope>test</scope>
- </dependency>
+ <dependency>
+ <groupId>com.codahale.metrics</groupId>
+ <artifactId>metrics-core</artifactId>
+ <version>${metrics.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.codahale.metrics</groupId>
+ <artifactId>metrics-graphite</artifactId>
+ <version>${metrics.version}</version>
+ </dependency>
<dependency>
<groupId>org.mockito</groupId>
@@ -88,16 +92,16 @@
<build>
<plugins>
-
+
<!-- <plugin>
<groupId>org.safehaus.chop</groupId>
<artifactId>chop-maven-plugin</artifactId>
<version>${chop.version}</version>
-
+
NOTE: you should be putting most of these variables into your settings.xml
as an automatically activated profile.
-
+
<configuration>
<accessKey>${aws.s3.key}</accessKey>
@@ -119,11 +123,11 @@
<runnerCount>6</runnerCount>
<runnerName>${runner.name}</runnerName>
<securityGroupExceptions>
-
+
Add your own IP address as an exception to allow access
but please do this in the settings.xml file .. essentially
all parameters should be in the settings.xml file.
-
+
<param>${myip.address}/32:24981</param>
<param>${myip.address}/32:22</param>
</securityGroupExceptions>
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c57018/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
index df10816..53e116d 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
@@ -25,6 +25,9 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Timer;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,9 +59,12 @@ import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import com.netflix.astyanax.MutationBatch;
+import rx.Notification;
import rx.Observable;
import rx.Observer;
import rx.Subscriber;
+import rx.functions.Action0;
+import rx.functions.Action1;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
@@ -81,6 +87,30 @@ public class GraphManagerImpl implements GraphManager {
private final EdgeDeleteListener edgeDeleteListener;
private final NodeDeleteListener nodeDeleteListener;
+ private final Timer writeEdgeTimer;
+ private final Meter writeEdgeMeter;
+ private final Meter deleteEdgeMeter;
+ private final Timer deleteEdgeTimer;
+ private final Meter deleteNodeMeter;
+ private final Timer deleteNodeTimer;
+ private final Meter loadEdgesFromSourceMeter;
+ private final Timer loadEdgesFromSourceTimer;
+ private final Meter loadEdgesToTargetMeter;
+ private final Timer loadEdgesToTargetTimer;
+ private final Meter loadEdgesVersionsMeter;
+ private final Timer loadEdgesVersionsTimer;
+ private final Meter loadEdgesFromSourceByTypeMeter;
+ private final Timer loadEdgesFromSourceByTypeTimer;
+ private final Meter loadEdgesToTargetByTypeMeter;
+ private final Timer loadEdgesToTargetByTypeTimer;
+ private final Timer getEdgeTypesFromSourceTimer;
+ private final Meter getEdgeTypesFromSourceMeter;
+ private final Timer getIdTypesFromSourceTimer;
+ private final Meter getIdTypesFromSourceMeter;
+ private final Meter getEdgeTypesToTargetMeter;
+ private final Timer getEdgeTypesToTargetTimer;
+ private final Timer getIdTypesToTargetTimer;
+ private final Meter getIdTypesToTargetMeter;
private Observer<Integer> edgeDeleteSubcriber;
private Observer<Integer> nodeDelete;
@@ -96,7 +126,8 @@ public class GraphManagerImpl implements GraphManager {
final GraphFig graphFig,
final EdgeDeleteListener edgeDeleteListener,
final NodeDeleteListener nodeDeleteListener,
- @Assisted final ApplicationScope scope) {
+ @Assisted final ApplicationScope scope,
+ MetricsFactory metricsFactory) {
ValidationUtils.validateApplicationScope( scope );
@@ -117,6 +148,36 @@ public class GraphManagerImpl implements GraphManager {
this.edgeDeleteSubcriber = MetricSubscriber.INSTANCE;
this.nodeDelete = MetricSubscriber.INSTANCE;
+ this.writeEdgeMeter = metricsFactory.getMeter(GraphManagerImpl.class, "write.edge.meter");
+ this.writeEdgeTimer = metricsFactory.getTimer(GraphManagerImpl.class, "write.edge.timer");
+ this.deleteEdgeMeter = metricsFactory.getMeter(GraphManagerImpl.class, "delete.edge.meter");
+ this.deleteEdgeTimer = metricsFactory.getTimer(GraphManagerImpl.class, "delete.edge.timer");
+ this.deleteNodeMeter = metricsFactory.getMeter(GraphManagerImpl.class, "delete.node.meter");
+ this.deleteNodeTimer = metricsFactory.getTimer(GraphManagerImpl.class, "delete.node.timer");
+ this.loadEdgesFromSourceMeter = metricsFactory.getMeter(GraphManagerImpl.class, "load.from.meter");
+ this.loadEdgesFromSourceTimer = metricsFactory.getTimer(GraphManagerImpl.class, "load.from.timer");
+ this.loadEdgesToTargetMeter = metricsFactory.getMeter(GraphManagerImpl.class, "load.to.meter");
+ this.loadEdgesToTargetTimer = metricsFactory.getTimer(GraphManagerImpl.class, "load.to.timer");
+ this.loadEdgesVersionsMeter = metricsFactory.getMeter(GraphManagerImpl.class, "load.versions.meter");
+ this.loadEdgesVersionsTimer = metricsFactory.getTimer(GraphManagerImpl.class,"load.versions.timer");
+ this.loadEdgesFromSourceByTypeMeter = metricsFactory.getMeter(GraphManagerImpl.class, "load.from.type.meter");
+ this.loadEdgesFromSourceByTypeTimer = metricsFactory.getTimer(GraphManagerImpl.class, "load.from.type.timer");
+ this.loadEdgesToTargetByTypeMeter = metricsFactory.getMeter(GraphManagerImpl.class, "load.to.type.meter");
+ this.loadEdgesToTargetByTypeTimer = metricsFactory.getTimer(GraphManagerImpl.class, "load.to.type.timer");
+
+ this.getEdgeTypesFromSourceTimer = metricsFactory.getTimer(GraphManagerImpl.class,"get.edge.from.timer");
+ this.getEdgeTypesFromSourceMeter = metricsFactory.getMeter(GraphManagerImpl.class, "write.edge.meter");
+
+ this.getIdTypesFromSourceTimer = metricsFactory.getTimer(GraphManagerImpl.class,"get.idtype.from.timer");
+ this.getIdTypesFromSourceMeter = metricsFactory.getMeter(GraphManagerImpl.class, "get.idtype.from.meter");
+
+ this.getEdgeTypesToTargetTimer = metricsFactory.getTimer(GraphManagerImpl.class,"get.edge.to.timer");
+ this.getEdgeTypesToTargetMeter = metricsFactory.getMeter(GraphManagerImpl.class, "get.edge.to.meter");
+
+ this.getIdTypesToTargetTimer = metricsFactory.getTimer(GraphManagerImpl.class, "get.idtype.to.timer");
+ this.getIdTypesToTargetMeter = metricsFactory.getMeter(GraphManagerImpl.class, "get.idtype.to.meter");
+
+
}
@@ -125,6 +186,8 @@ public class GraphManagerImpl implements GraphManager {
GraphValidation.validateEdge( edge );
final MarkedEdge markedEdge = new SimpleMarkedEdge( edge, false );
+ final Timer.Context timer = writeEdgeTimer.time();
+ final Meter meter = writeEdgeMeter;
return Observable.from( markedEdge ).map( new Func1<MarkedEdge, Edge>() {
@Override
@@ -143,45 +206,72 @@ public class GraphManagerImpl implements GraphManager {
return edge;
}
- } );
+ } )
+ .doOnEach(new Action1<Notification<? super Edge>>() {
+ @Override
+ public void call(Notification<? super Edge> notification) {
+ meter.mark();
+ }
+ })
+ .doOnCompleted(new Action0() {
+ @Override
+ public void call() {
+ timer.stop();
+ }
+ });
}
@Override
public Observable<Edge> deleteEdge( final Edge edge ) {
- GraphValidation.validateEdge( edge );
+ GraphValidation.validateEdge(edge);
- final MarkedEdge markedEdge = new SimpleMarkedEdge( edge, true );
+ final MarkedEdge markedEdge = new SimpleMarkedEdge(edge, true);
-
- return Observable.from( markedEdge ).map( new Func1<MarkedEdge, Edge>() {
+ final Timer.Context timer = deleteEdgeTimer.time();
+ final Meter meter = deleteEdgeMeter;
+ return Observable.from(markedEdge).map(new Func1<MarkedEdge, Edge>() {
@Override
- public Edge call( final MarkedEdge edge ) {
+ public Edge call(final MarkedEdge edge) {
final UUID timestamp = UUIDGenerator.newTimeUUID();
- final MutationBatch edgeMutation = storageEdgeSerialization.writeEdge( scope, edge, timestamp );
+ final MutationBatch edgeMutation = storageEdgeSerialization.writeEdge(scope, edge, timestamp);
- LOG.debug( "Marking edge {} as deleted to commit log", edge );
- HystrixCassandra.user( edgeMutation );
+ LOG.debug("Marking edge {} as deleted to commit log", edge);
+ HystrixCassandra.user(edgeMutation);
//HystrixCassandra.async( edgeDeleteListener.receive( scope, markedEdge,
// timestamp )).subscribeOn( Schedulers.io() ).subscribe( edgeDeleteSubcriber );
- edgeDeleteListener.receive( scope, markedEdge, timestamp ).subscribeOn( Schedulers.io() )
- .subscribe( edgeDeleteSubcriber );
+ edgeDeleteListener.receive(scope, markedEdge, timestamp).subscribeOn(Schedulers.io())
+ .subscribe(edgeDeleteSubcriber);
return edge;
}
- } );
+ })
+ .doOnEach(new Action1<Notification<? super Edge>>() {
+ @Override
+ public void call(Notification<? super Edge> notification) {
+ meter.mark();
+ }
+ })
+ .doOnCompleted(new Action0() {
+ @Override
+ public void call() {
+ timer.stop();
+ }
+ });
}
@Override
public Observable<Id> deleteNode( final Id node, final long timestamp ) {
+ final Timer.Context timer = deleteNodeTimer.time();
+ final Meter meter = deleteNodeMeter;
return Observable.from( node ).map( new Func1<Id, Id>() {
@Override
public Id call( final Id id ) {
@@ -205,114 +295,250 @@ public class GraphManagerImpl implements GraphManager {
return id;
}
- } );
+ } )
+ .doOnEach(new Action1<Notification<? super Id>>() {
+ @Override
+ public void call(Notification<? super Id> notification) {
+ meter.mark();
+ }
+ })
+ .doOnCompleted(new Action0() {
+ @Override
+ public void call() {
+ timer.stop();
+ }
+ });
}
@Override
public Observable<Edge> loadEdgeVersions( final SearchByEdge searchByEdge ) {
+ final Timer.Context timer = loadEdgesVersionsTimer.time();
+ final Meter meter = loadEdgesVersionsMeter;
return Observable.create( new ObservableIterator<MarkedEdge>( "getEdgeTypesFromSource" ) {
@Override
protected Iterator<MarkedEdge> getIterator() {
return storageEdgeSerialization.getEdgeVersions( scope, searchByEdge );
}
- } ).buffer( graphFig.getScanPageSize() ).flatMap( new EdgeBufferFilter( searchByEdge.getMaxTimestamp() ) )
- .cast( Edge.class );
+ } ).buffer( graphFig.getScanPageSize() ).flatMap(new EdgeBufferFilter(searchByEdge.getMaxTimestamp()))
+ .cast(Edge.class)
+ .doOnEach(new Action1<Notification<? super Edge>>() {
+ @Override
+ public void call(Notification<? super Edge> notification) {
+ meter.mark();
+ }
+ })
+ .doOnCompleted(new Action0() {
+ @Override
+ public void call() {
+ timer.stop();
+ }
+ });
}
@Override
public Observable<Edge> loadEdgesFromSource( final SearchByEdgeType search ) {
+ final Timer.Context timer = loadEdgesFromSourceTimer.time();
+ final Meter meter = loadEdgesFromSourceMeter;
return Observable.create( new ObservableIterator<MarkedEdge>( "getEdgeTypesFromSource" ) {
@Override
protected Iterator<MarkedEdge> getIterator() {
return storageEdgeSerialization.getEdgesFromSource( scope, search );
}
- } ).buffer( graphFig.getScanPageSize() ).flatMap( new EdgeBufferFilter( search.getMaxTimestamp() ) )
- .cast( Edge.class );
+ } ).buffer( graphFig.getScanPageSize() ).flatMap(new EdgeBufferFilter(search.getMaxTimestamp()))
+ .cast(Edge.class)
+ .doOnEach(new Action1<Notification<? super Edge>>() {
+ @Override
+ public void call(Notification<? super Edge> notification) {
+ meter.mark();
+ }
+ })
+ .doOnCompleted(new Action0() {
+ @Override
+ public void call() {
+ timer.stop();
+ }
+ });
}
@Override
public Observable<Edge> loadEdgesToTarget( final SearchByEdgeType search ) {
+ final Timer.Context timer = loadEdgesToTargetTimer.time();
+ final Meter meter = loadEdgesToTargetMeter;
return Observable.create( new ObservableIterator<MarkedEdge>( "getEdgeTypesFromSource" ) {
@Override
protected Iterator<MarkedEdge> getIterator() {
return storageEdgeSerialization.getEdgesToTarget( scope, search );
}
- } ).buffer( graphFig.getScanPageSize() ).flatMap( new EdgeBufferFilter( search.getMaxTimestamp() ) )
- .cast( Edge.class );
+ } ).buffer( graphFig.getScanPageSize() ).flatMap(new EdgeBufferFilter(search.getMaxTimestamp()))
+ .cast(Edge.class)
+ .doOnEach(new Action1<Notification<? super Edge>>() {
+ @Override
+ public void call(Notification<? super Edge> notification) {
+ meter.mark();
+ }
+ })
+ .doOnCompleted(new Action0() {
+ @Override
+ public void call() {
+ timer.stop();
+ }
+ });
}
@Override
public Observable<Edge> loadEdgesFromSourceByType( final SearchByIdType search ) {
+ final Timer.Context timer = loadEdgesFromSourceByTypeTimer.time();
+ final Meter meter = loadEdgesFromSourceByTypeMeter;
return Observable.create( new ObservableIterator<MarkedEdge>( "getEdgeTypesFromSource" ) {
@Override
protected Iterator<MarkedEdge> getIterator() {
return storageEdgeSerialization.getEdgesFromSourceByTargetType( scope, search );
}
- } ).buffer( graphFig.getScanPageSize() ).flatMap( new EdgeBufferFilter( search.getMaxTimestamp() ) )
-
- .cast( Edge.class );
+ } ).buffer( graphFig.getScanPageSize() ).flatMap(new EdgeBufferFilter(search.getMaxTimestamp()))
+
+ .cast(Edge.class)
+ .doOnEach(new Action1<Notification<? super Edge>>() {
+ @Override
+ public void call(Notification<? super Edge> notification) {
+ meter.mark();
+ }
+ })
+ .doOnCompleted(new Action0() {
+ @Override
+ public void call() {
+ timer.stop();
+ }
+ });
}
@Override
public Observable<Edge> loadEdgesToTargetByType( final SearchByIdType search ) {
+ final Timer.Context timer = loadEdgesToTargetByTypeTimer.time();
+ final Meter meter = loadEdgesToTargetByTypeMeter;
return Observable.create( new ObservableIterator<MarkedEdge>( "getEdgeTypesFromSource" ) {
@Override
protected Iterator<MarkedEdge> getIterator() {
return storageEdgeSerialization.getEdgesToTargetBySourceType( scope, search );
}
- } ).buffer( graphFig.getScanPageSize() ).flatMap( new EdgeBufferFilter( search.getMaxTimestamp() ) )
- .cast( Edge.class );
+ } ).buffer( graphFig.getScanPageSize() ).flatMap(new EdgeBufferFilter(search.getMaxTimestamp()))
+ .cast(Edge.class)
+ .doOnEach(new Action1<Notification<? super Edge>>() {
+ @Override
+ public void call(Notification<? super Edge> notification) {
+ meter.mark();
+ }
+ })
+ .doOnCompleted(new Action0() {
+ @Override
+ public void call() {
+ timer.stop();
+ }
+ });
}
@Override
public Observable<String> getEdgeTypesFromSource( final SearchEdgeType search ) {
-
+ final Timer.Context timer = getEdgeTypesFromSourceTimer.time();
+ final Meter meter = getEdgeTypesFromSourceMeter;
return Observable.create( new ObservableIterator<String>( "getEdgeTypesFromSource" ) {
@Override
protected Iterator<String> getIterator() {
return edgeMetadataSerialization.getEdgeTypesFromSource( scope, search );
}
- } );
+ } )
+ .doOnEach(new Action1<Notification<? super String>>() {
+ @Override
+ public void call(Notification<? super String> notification) {
+ meter.mark();
+ }
+ })
+ .doOnCompleted(new Action0() {
+ @Override
+ public void call() {
+ timer.stop();
+ }
+ });
}
@Override
public Observable<String> getIdTypesFromSource( final SearchIdType search ) {
+ final Timer.Context timer = getIdTypesFromSourceTimer.time();
+ final Meter meter = getIdTypesFromSourceMeter;
return Observable.create( new ObservableIterator<String>( "getIdTypesFromSource" ) {
@Override
protected Iterator<String> getIterator() {
return edgeMetadataSerialization.getIdTypesFromSource( scope, search );
}
- } );
+ } )
+ .doOnEach(new Action1<Notification<? super String>>() {
+ @Override
+ public void call(Notification<? super String> notification) {
+ meter.mark();
+ }
+ })
+ .doOnCompleted(new Action0() {
+ @Override
+ public void call() {
+ timer.stop();
+ }
+ });
}
@Override
public Observable<String> getEdgeTypesToTarget( final SearchEdgeType search ) {
-
+ final Timer.Context timer = getEdgeTypesToTargetTimer.time();
+ final Meter meter = getEdgeTypesToTargetMeter;
return Observable.create( new ObservableIterator<String>( "getEdgeTypesToTarget" ) {
@Override
protected Iterator<String> getIterator() {
return edgeMetadataSerialization.getEdgeTypesToTarget( scope, search );
}
- } );
+ } )
+ .doOnEach(new Action1<Notification<? super String>>() {
+ @Override
+ public void call(Notification<? super String> notification) {
+ meter.mark();
+ }
+ })
+ .doOnCompleted(new Action0() {
+ @Override
+ public void call() {
+ timer.stop();
+ }
+ });
}
@Override
public Observable<String> getIdTypesToTarget( final SearchIdType search ) {
+ final Timer.Context timer = getIdTypesToTargetTimer.time();
+ final Meter meter = getIdTypesToTargetMeter;
return Observable.create( new ObservableIterator<String>( "getIdTypesToTarget" ) {
@Override
protected Iterator<String> getIterator() {
return edgeMetadataSerialization.getIdTypesToTarget( scope, search );
}
- } );
+ } )
+ .doOnEach(new Action1<Notification<? super String>>() {
+ @Override
+ public void call(Notification<? super String> notification) {
+ meter.mark();
+ }
+ })
+ .doOnCompleted(new Action0() {
+ @Override
+ public void call() {
+ timer.stop();
+ }
+ });
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c57018/stack/pom.xml
----------------------------------------------------------------------
diff --git a/stack/pom.xml b/stack/pom.xml
index 25b2258..73f2cbd 100644
--- a/stack/pom.xml
+++ b/stack/pom.xml
@@ -109,7 +109,6 @@
<jersey-version>1.18.1</jersey-version>
<junit-version>4.12</junit-version>
<log4j-version>1.2.16</log4j-version>
- <metrics-version>2.1.2</metrics-version>
<org.springframework.version>3.1.2.RELEASE</org.springframework.version>
<shiro-version>1.2.3</shiro-version>
<slf4j-version>1.6.1</slf4j-version>