You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by gr...@apache.org on 2015/03/03 01:21:50 UTC
[02/27] incubator-usergrid git commit: Adding metrics to cp
Adding metrics to cp
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/7e0e0be8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/7e0e0be8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/7e0e0be8
Branch: refs/heads/USERGRID-280
Commit: 7e0e0be85d2cb65990ca4f8f4dfcd09330f47a04
Parents: db40139
Author: Shawn Feldman <sf...@apache.org>
Authored: Tue Feb 17 15:35:25 2015 -0800
Committer: Shawn Feldman <sf...@apache.org>
Committed: Tue Feb 17 15:35:25 2015 -0800
----------------------------------------------------------------------
stack/corepersistence/common/pom.xml | 12 ++
.../persistence/core/guice/CommonModule.java | 6 +-
.../core/metrics/MetricsFactory.java | 34 ++++++
.../core/metrics/MetricsFactoryImpl.java | 111 +++++++++++++++++++
.../persistence/core/metrics/MetricsFig.java | 33 ++++++
stack/corepersistence/pom.xml | 1 +
.../usergrid/persistence/index/IndexFig.java | 11 ++
.../index/impl/IndexBatchBufferImpl.java | 55 ++++++---
.../persistence/index/impl/EntityIndexTest.java | 5 +-
9 files changed, 245 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7e0e0be8/stack/corepersistence/common/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/pom.xml b/stack/corepersistence/common/pom.xml
index 42b2996..82df1d8 100644
--- a/stack/corepersistence/common/pom.xml
+++ b/stack/corepersistence/common/pom.xml
@@ -151,6 +151,18 @@
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>com.codahale.metrics</groupId>
+ <artifactId>metrics-core</artifactId>
+ <version>${metrics.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.codahale.metrics</groupId>
+ <artifactId>metrics-graphite</artifactId>
+ <version>${metrics.version}</version>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7e0e0be8/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
index faa4e39..33321a4 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
@@ -19,6 +19,9 @@
package org.apache.usergrid.persistence.core.guice;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactoryImpl;
+import org.apache.usergrid.persistence.core.metrics.MetricsFig;
import org.safehaus.guicyfig.GuicyFigModule;
import org.apache.usergrid.persistence.core.astyanax.AstyanaxKeyspaceProvider;
@@ -71,7 +74,6 @@ public class CommonModule extends AbstractModule {
bind( CassandraConfig.class ).to( CassandraConfigImpl.class );
-
/**
* Data migration beans
*/
@@ -79,7 +81,9 @@ public class CommonModule extends AbstractModule {
bind( DataMigrationManager.class ).to( DataMigrationManagerImpl.class );
+ bind( MetricsFactory.class ).to( MetricsFactoryImpl.class );
+ install(new GuicyFigModule(MetricsFig.class));
//do multibindings for migrations
Multibinder<DataMigration> dataMigrationMultibinder = Multibinder.newSetBinder( binder(), DataMigration.class );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7e0e0be8/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/metrics/MetricsFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/metrics/MetricsFactory.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/metrics/MetricsFactory.java
new file mode 100644
index 0000000..453e556
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/metrics/MetricsFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.persistence.core.metrics;
+
+import com.codahale.metrics.*;
+
+/**
+ * Classy class class.
+ */
+public interface MetricsFactory {
+ MetricRegistry getRegistry();
+
+ Timer getTimer(Class<?> klass, String name);
+
+ Histogram getHistogram(Class<?> klass, String name);
+
+ Counter getCounter(Class<?> klass, String name);
+
+ Meter getMeter(Class<?> klass, String name);
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7e0e0be8/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/metrics/MetricsFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/metrics/MetricsFactoryImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/metrics/MetricsFactoryImpl.java
new file mode 100644
index 0000000..201ee0f
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/metrics/MetricsFactoryImpl.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.persistence.core.metrics;
+
+
+import com.codahale.metrics.*;
+import com.codahale.metrics.graphite.Graphite;
+import com.codahale.metrics.graphite.GraphiteReporter;
+import com.google.inject.Singleton;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Singleton class to manage metrics.
+ */
+@Singleton
+public class MetricsFactoryImpl implements MetricsFactory {
+
+ private final MetricsFig metricsFig;
+ private MetricRegistry registry;
+ private GraphiteReporter graphiteReporter;
+ private JmxReporter jmxReporter;
+ private ConcurrentHashMap<String,Metric> hashMap;
+ private static final Logger LOG = LoggerFactory.getLogger(MetricsFactoryImpl.class);
+
+ public MetricsFactoryImpl(MetricsFig metricsFig) {
+ this.metricsFig = metricsFig;
+ registry = new MetricRegistry();
+ String metricsHost = metricsFig.getHost();
+ if(!metricsHost.equals("false")) {
+ Graphite graphite = new Graphite(new InetSocketAddress(metricsHost, 2003));
+ graphiteReporter = GraphiteReporter.forRegistry(registry)
+ .prefixedWith("notifications")
+ .convertRatesTo(TimeUnit.SECONDS)
+ .convertDurationsTo(TimeUnit.MILLISECONDS)
+ .filter(MetricFilter.ALL)
+ .build(graphite);
+ graphiteReporter.start(30, TimeUnit.SECONDS);
+ }else {
+ LOG.warn("MetricsService:Logger not started.");
+ }
+ hashMap = new ConcurrentHashMap<String, Metric>();
+
+ jmxReporter = JmxReporter.forRegistry(registry).build();
+ jmxReporter.start();
+ }
+
+ @Override
+ public MetricRegistry getRegistry() {
+ return registry;
+ }
+
+ @Override
+ public Timer getTimer(Class<?> klass, String name) {
+ return getMetric(Timer.class, klass, name);
+ }
+
+ @Override
+ public Histogram getHistogram(Class<?> klass, String name) {
+ return getMetric(Histogram.class, klass, name);
+ }
+
+ @Override
+ public Counter getCounter(Class<?> klass, String name) {
+ return getMetric(Counter.class, klass, name);
+ }
+
+ @Override
+ public Meter getMeter(Class<?> klass, String name) {
+ return getMetric(Meter.class, klass, name);
+ }
+
+ private <T> T getMetric(Class<T> metricClass, Class<?> klass, String name) {
+ String key = metricClass.getName() + klass.getName() + name;
+ Metric metric = hashMap.get(key);
+ if (metric == null) {
+ if (metricClass == Histogram.class) {
+ metric = this.getRegistry().histogram(MetricRegistry.name(klass, name));
+ }
+ if (metricClass == Timer.class) {
+ metric = this.getRegistry().timer(MetricRegistry.name(klass, name));
+ }
+ if (metricClass == Meter.class) {
+ metric = this.getRegistry().meter(MetricRegistry.name(klass, name));
+ }
+ if (metricClass == Counter.class) {
+ metric = this.getRegistry().counter(MetricRegistry.name(klass, name));
+ }
+ hashMap.put(key, metric);
+ }
+ return (T) metric;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7e0e0be8/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/metrics/MetricsFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/metrics/MetricsFig.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/metrics/MetricsFig.java
new file mode 100644
index 0000000..c21a78d
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/metrics/MetricsFig.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.persistence.core.metrics;
+
+import org.safehaus.guicyfig.Default;
+import org.safehaus.guicyfig.FigSingleton;
+import org.safehaus.guicyfig.GuicyFig;
+import org.safehaus.guicyfig.Key;
+
+/**
+ * Classy class class.
+ */
+@FigSingleton
+public interface MetricsFig extends GuicyFig {
+
+ @Default("false")
+ @Key( "usergrid.metrics.graphite.host" )
+ String getHost();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7e0e0be8/stack/corepersistence/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/pom.xml b/stack/corepersistence/pom.xml
index c89e62f..9656e2d 100644
--- a/stack/corepersistence/pom.xml
+++ b/stack/corepersistence/pom.xml
@@ -75,6 +75,7 @@ limitations under the License.
<slf4j.version>1.7.2</slf4j.version>
<surefire.version>2.16</surefire.version>
<aws.version>1.9.0</aws.version>
+ <metrics.version>3.0.0</metrics.version>
</properties>
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7e0e0be8/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
index d78ed72..36ca588 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
@@ -49,6 +49,9 @@ public interface IndexFig extends GuicyFig {
public static final String ELASTICSEARCH_FORCE_REFRESH = "elasticsearch.force_refresh";
+ public static final String INDEX_BUFFER_SIZE = "elasticsearch.buffer_size";
+
+ public static final String INDEX_BUFFER_TIMEOUT = "elasticsearch.buffer_size";
/**
* the number of times we can fail before we refresh the client
@@ -113,4 +116,12 @@ public interface IndexFig extends GuicyFig {
@Default("2")
int getIndexCacheMaxWorkers();
+
+ @Default("1000")
+ @Key( INDEX_BUFFER_TIMEOUT )
+ int getIndexBufferTimeout();
+
+ @Default("100")
+ @Key( INDEX_BUFFER_SIZE )
+ int getIndexBufferSize();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7e0e0be8/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
index 645c611..29d6e4d 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
@@ -16,7 +16,10 @@
*/
package org.apache.usergrid.persistence.index.impl;
+import com.codahale.metrics.Timer;
+import com.google.inject.Inject;
import com.google.inject.Singleton;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.index.IndexBatchBuffer;
import org.apache.usergrid.persistence.index.IndexFig;
import org.elasticsearch.action.bulk.BulkItemResponse;
@@ -30,9 +33,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscriber;
+import rx.Subscription;
import rx.functions.Action1;
import java.util.List;
+import java.util.concurrent.TimeUnit;
/**
@@ -46,15 +51,26 @@ public class IndexBatchBufferImpl implements IndexBatchBuffer {
private final FailureMonitor failureMonitor;
private final IndexFig config;
private final boolean refresh;
+ private final int timeout;
+ private final int bufferSize;
+ private final MetricsFactory metricsFactory;
+ private final Timer flushTimer;
private BulkRequestBuilder bulkRequest;
private Producer producer;
+ private Subscription producerObservable;
- public IndexBatchBufferImpl(final Client client, FailureMonitor failureMonitor,final IndexFig config){
- this.client = client;
- this.failureMonitor = failureMonitor;
+ @Inject
+ public IndexBatchBufferImpl(final IndexFig config, final EsProvider provider, MetricsFactory metricsFactory){
+ this.metricsFactory = metricsFactory;
+ this.client = provider.getClient();
+ this.failureMonitor = new FailureMonitorImpl( config, provider );
this.config = config;
this.producer = new Producer();
this.refresh = config.isForcedRefresh();
+ this.timeout = config.getIndexBufferTimeout();
+ this.bufferSize = config.getIndexBufferSize();
+ this.flushTimer = metricsFactory.getTimer(IndexBatchBuffer.class,"index.buffer.flush");
+ clearBulk();
init();
}
@@ -63,24 +79,25 @@ public class IndexBatchBufferImpl implements IndexBatchBuffer {
}
private void init() {
- clearBulk();
- Observable.create(producer)
- .buffer(10)
+ this.producerObservable = Observable.create(producer)
+ .doOnNext(new Action1<RequestBuilderContainer>() {
+ @Override
+ public void call(RequestBuilderContainer container) {
+ ShardReplicationOperationRequestBuilder builder = container.getBuilder();
+ if (builder instanceof IndexRequestBuilder) {
+ bulkRequest.add((IndexRequestBuilder) builder);
+ }
+ if (builder instanceof DeleteRequestBuilder) {
+ bulkRequest.add((DeleteRequestBuilder)builder);
+ }
+ }
+ })
+ .buffer(timeout, TimeUnit.MILLISECONDS, bufferSize)
.doOnNext(new Action1<List<RequestBuilderContainer>>() {
@Override
public void call(List<RequestBuilderContainer> builderContainerList) {
- System.out.println("test test!!!");
- for (RequestBuilderContainer container : builderContainerList) {
- ShardReplicationOperationRequestBuilder builder = container.getBuilder();
- if (builder instanceof IndexRequestBuilder) {
- bulkRequest.add((IndexRequestBuilder) builder);
- continue;
- }
- if (builder instanceof DeleteRequestBuilder) {
- bulkRequest.add((DeleteRequestBuilder)builder);
- continue;
- }
- }
+ flushTimer.time();
+ metricsFactory.getCounter(IndexBatchBuffer.class,"index.buffer.size").dec(builderContainerList.size());
execute();
}
})
@@ -88,10 +105,12 @@ public class IndexBatchBufferImpl implements IndexBatchBuffer {
}
public void put(IndexRequestBuilder builder){
+ metricsFactory.getCounter(IndexBatchBuffer.class,"index.buffer.size").inc();
producer.put(new RequestBuilderContainer(builder));
}
public void put(DeleteRequestBuilder builder){
+ metricsFactory.getCounter(IndexBatchBuffer.class,"index.buffer.size").inc();
producer.put(new RequestBuilderContainer(builder));
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7e0e0be8/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
index 4cd3047..736eb9b 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
@@ -201,15 +201,12 @@ public class EntityIndexTest extends BaseIT {
if(count %1000 == 0){
batch.execute();
}
-
-
-
if ( ++count > max ) {
break;
}
}
- batch.execute();
+ batch.executeAndRefresh();
timer.stop();
log.info( "Total time to index {} entries {}ms, average {}ms/entry",
new Object[] { count, timer.getTime(), timer.getTime() / count } );