You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by gr...@apache.org on 2015/03/03 01:21:50 UTC

[02/27] incubator-usergrid git commit: Adding metrics to cp

Adding metrics to cp


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/7e0e0be8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/7e0e0be8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/7e0e0be8

Branch: refs/heads/USERGRID-280
Commit: 7e0e0be85d2cb65990ca4f8f4dfcd09330f47a04
Parents: db40139
Author: Shawn Feldman <sf...@apache.org>
Authored: Tue Feb 17 15:35:25 2015 -0800
Committer: Shawn Feldman <sf...@apache.org>
Committed: Tue Feb 17 15:35:25 2015 -0800

----------------------------------------------------------------------
 stack/corepersistence/common/pom.xml            |  12 ++
 .../persistence/core/guice/CommonModule.java    |   6 +-
 .../core/metrics/MetricsFactory.java            |  34 ++++++
 .../core/metrics/MetricsFactoryImpl.java        | 111 +++++++++++++++++++
 .../persistence/core/metrics/MetricsFig.java    |  33 ++++++
 stack/corepersistence/pom.xml                   |   1 +
 .../usergrid/persistence/index/IndexFig.java    |  11 ++
 .../index/impl/IndexBatchBufferImpl.java        |  55 ++++++---
 .../persistence/index/impl/EntityIndexTest.java |   5 +-
 9 files changed, 245 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7e0e0be8/stack/corepersistence/common/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/pom.xml b/stack/corepersistence/common/pom.xml
index 42b2996..82df1d8 100644
--- a/stack/corepersistence/common/pom.xml
+++ b/stack/corepersistence/common/pom.xml
@@ -151,6 +151,18 @@
       <scope>test</scope>
     </dependency>
 
+
+    <dependency>
+      <groupId>com.codahale.metrics</groupId>
+      <artifactId>metrics-core</artifactId>
+      <version>${metrics.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.codahale.metrics</groupId>
+      <artifactId>metrics-graphite</artifactId>
+      <version>${metrics.version}</version>
+    </dependency>
   </dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7e0e0be8/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
index faa4e39..33321a4 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
@@ -19,6 +19,9 @@
 package org.apache.usergrid.persistence.core.guice;
 
 
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactoryImpl;
+import org.apache.usergrid.persistence.core.metrics.MetricsFig;
 import org.safehaus.guicyfig.GuicyFigModule;
 
 import org.apache.usergrid.persistence.core.astyanax.AstyanaxKeyspaceProvider;
@@ -71,7 +74,6 @@ public class CommonModule extends AbstractModule {
 
         bind( CassandraConfig.class ).to( CassandraConfigImpl.class );
 
-
         /**
          * Data migration beans
          */
@@ -79,7 +81,9 @@ public class CommonModule extends AbstractModule {
 
         bind( DataMigrationManager.class ).to( DataMigrationManagerImpl.class );
 
+        bind( MetricsFactory.class ).to( MetricsFactoryImpl.class );
 
+        install(new GuicyFigModule(MetricsFig.class));
 
         //do multibindings for migrations
         Multibinder<DataMigration> dataMigrationMultibinder = Multibinder.newSetBinder( binder(), DataMigration.class );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7e0e0be8/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/metrics/MetricsFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/metrics/MetricsFactory.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/metrics/MetricsFactory.java
new file mode 100644
index 0000000..453e556
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/metrics/MetricsFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.persistence.core.metrics;
+
+import com.codahale.metrics.*;
+
+/**
+ * Classy class class.
+ */
+public interface MetricsFactory {
+    MetricRegistry getRegistry();
+
+    Timer getTimer(Class<?> klass, String name);
+
+    Histogram getHistogram(Class<?> klass, String name);
+
+    Counter getCounter(Class<?> klass, String name);
+
+    Meter getMeter(Class<?> klass, String name);
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7e0e0be8/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/metrics/MetricsFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/metrics/MetricsFactoryImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/metrics/MetricsFactoryImpl.java
new file mode 100644
index 0000000..201ee0f
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/metrics/MetricsFactoryImpl.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.persistence.core.metrics;
+
+
+import com.codahale.metrics.*;
+import com.codahale.metrics.graphite.Graphite;
+import com.codahale.metrics.graphite.GraphiteReporter;
+import com.google.inject.Singleton;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Singleton class to manage metrics.
+ */
+@Singleton
+public class MetricsFactoryImpl implements MetricsFactory {
+
+    private final MetricsFig metricsFig;
+    private MetricRegistry registry;
+    private GraphiteReporter graphiteReporter;
+    private JmxReporter jmxReporter;
+    private ConcurrentHashMap<String,Metric> hashMap;
+    private static final Logger LOG = LoggerFactory.getLogger(MetricsFactoryImpl.class);
+
+    public MetricsFactoryImpl(MetricsFig metricsFig) {
+        this.metricsFig = metricsFig;
+        registry = new MetricRegistry();
+        String metricsHost = metricsFig.getHost();
+        if(!metricsHost.equals("false")) {
+            Graphite graphite = new Graphite(new InetSocketAddress(metricsHost, 2003));
+            graphiteReporter = GraphiteReporter.forRegistry(registry)
+                    .prefixedWith("notifications")
+                    .convertRatesTo(TimeUnit.SECONDS)
+                    .convertDurationsTo(TimeUnit.MILLISECONDS)
+                    .filter(MetricFilter.ALL)
+                    .build(graphite);
+            graphiteReporter.start(30, TimeUnit.SECONDS);
+        }else {
+            LOG.warn("MetricsService:Logger not started.");
+        }
+        hashMap = new ConcurrentHashMap<String, Metric>();
+
+        jmxReporter = JmxReporter.forRegistry(registry).build();
+        jmxReporter.start();
+    }
+
+    @Override
+    public MetricRegistry getRegistry() {
+        return registry;
+    }
+
+    @Override
+    public Timer getTimer(Class<?> klass, String name) {
+        return getMetric(Timer.class, klass, name);
+    }
+
+    @Override
+    public Histogram getHistogram(Class<?> klass, String name) {
+        return getMetric(Histogram.class, klass, name);
+    }
+
+    @Override
+    public Counter getCounter(Class<?> klass, String name) {
+        return getMetric(Counter.class, klass, name);
+    }
+
+    @Override
+    public Meter getMeter(Class<?> klass, String name) {
+        return getMetric(Meter.class, klass, name);
+    }
+
+    private <T> T getMetric(Class<T> metricClass, Class<?> klass, String name) {
+        String key = metricClass.getName() + klass.getName() + name;
+        Metric metric = hashMap.get(key);
+        if (metric == null) {
+            if (metricClass == Histogram.class) {
+                metric = this.getRegistry().histogram(MetricRegistry.name(klass, name));
+            }
+            if (metricClass == Timer.class) {
+                metric = this.getRegistry().timer(MetricRegistry.name(klass, name));
+            }
+            if (metricClass == Meter.class) {
+                metric = this.getRegistry().meter(MetricRegistry.name(klass, name));
+            }
+            if (metricClass == Counter.class) {
+                metric = this.getRegistry().counter(MetricRegistry.name(klass, name));
+            }
+            hashMap.put(key, metric);
+        }
+        return (T) metric;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7e0e0be8/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/metrics/MetricsFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/metrics/MetricsFig.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/metrics/MetricsFig.java
new file mode 100644
index 0000000..c21a78d
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/metrics/MetricsFig.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.persistence.core.metrics;
+
+import org.safehaus.guicyfig.Default;
+import org.safehaus.guicyfig.FigSingleton;
+import org.safehaus.guicyfig.GuicyFig;
+import org.safehaus.guicyfig.Key;
+
+/**
+ * Classy class class.
+ */
+@FigSingleton
+public interface MetricsFig extends GuicyFig {
+
+    @Default("false")
+    @Key( "usergrid.metrics.graphite.host" )
+    String getHost();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7e0e0be8/stack/corepersistence/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/pom.xml b/stack/corepersistence/pom.xml
index c89e62f..9656e2d 100644
--- a/stack/corepersistence/pom.xml
+++ b/stack/corepersistence/pom.xml
@@ -75,6 +75,7 @@ limitations under the License.
         <slf4j.version>1.7.2</slf4j.version>
         <surefire.version>2.16</surefire.version>
         <aws.version>1.9.0</aws.version>
+        <metrics.version>3.0.0</metrics.version>
 
     </properties>
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7e0e0be8/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
index d78ed72..36ca588 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
@@ -49,6 +49,9 @@ public interface IndexFig extends GuicyFig {
 
     public static final String ELASTICSEARCH_FORCE_REFRESH = "elasticsearch.force_refresh";
 
+    public static final String INDEX_BUFFER_SIZE = "elasticsearch.buffer_size";
+
+    public static final String INDEX_BUFFER_TIMEOUT = "elasticsearch.buffer_size";
 
     /**
      * the number of times we can fail before we refresh the client
@@ -113,4 +116,12 @@ public interface IndexFig extends GuicyFig {
 
     @Default("2")
     int getIndexCacheMaxWorkers();
+
+    @Default("1000")
+    @Key( INDEX_BUFFER_TIMEOUT )
+    int getIndexBufferTimeout();
+
+    @Default("100")
+    @Key( INDEX_BUFFER_SIZE )
+    int getIndexBufferSize();
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7e0e0be8/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
index 645c611..29d6e4d 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
@@ -16,7 +16,10 @@
  */
 package org.apache.usergrid.persistence.index.impl;
 
+import com.codahale.metrics.Timer;
+import com.google.inject.Inject;
 import com.google.inject.Singleton;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.index.IndexBatchBuffer;
 import org.apache.usergrid.persistence.index.IndexFig;
 import org.elasticsearch.action.bulk.BulkItemResponse;
@@ -30,9 +33,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import rx.Observable;
 import rx.Subscriber;
+import rx.Subscription;
 import rx.functions.Action1;
 
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 
 /**
@@ -46,15 +51,26 @@ public class IndexBatchBufferImpl implements IndexBatchBuffer {
     private final FailureMonitor failureMonitor;
     private final IndexFig config;
     private final boolean refresh;
+    private final int timeout;
+    private final int bufferSize;
+    private final MetricsFactory metricsFactory;
+    private final Timer flushTimer;
     private BulkRequestBuilder bulkRequest;
     private Producer producer;
+    private Subscription producerObservable;
 
-    public IndexBatchBufferImpl(final Client client, FailureMonitor failureMonitor,final IndexFig config){
-        this.client = client;
-        this.failureMonitor = failureMonitor;
+    @Inject
+    public IndexBatchBufferImpl(final IndexFig config, final EsProvider provider, MetricsFactory metricsFactory){
+        this.metricsFactory = metricsFactory;
+        this.client = provider.getClient();
+        this.failureMonitor = new FailureMonitorImpl( config, provider );
         this.config = config;
         this.producer = new Producer();
         this.refresh = config.isForcedRefresh();
+        this.timeout = config.getIndexBufferTimeout();
+        this.bufferSize = config.getIndexBufferSize();
+        this.flushTimer = metricsFactory.getTimer(IndexBatchBuffer.class,"index.buffer.flush");
+        clearBulk();
         init();
     }
 
@@ -63,24 +79,25 @@ public class IndexBatchBufferImpl implements IndexBatchBuffer {
     }
 
     private void init() {
-        clearBulk();
-        Observable.create(producer)
-                .buffer(10)
+        this.producerObservable = Observable.create(producer)
+                .doOnNext(new Action1<RequestBuilderContainer>() {
+                    @Override
+                    public void call(RequestBuilderContainer container) {
+                        ShardReplicationOperationRequestBuilder builder = container.getBuilder();
+                        if (builder instanceof IndexRequestBuilder) {
+                            bulkRequest.add((IndexRequestBuilder) builder);
+                        }
+                        if (builder instanceof DeleteRequestBuilder) {
+                            bulkRequest.add((DeleteRequestBuilder)builder);
+                        }
+                    }
+                })
+                .buffer(timeout, TimeUnit.MILLISECONDS, bufferSize)
                 .doOnNext(new Action1<List<RequestBuilderContainer>>() {
                     @Override
                     public void call(List<RequestBuilderContainer> builderContainerList) {
-                        System.out.println("test test!!!");
-                        for (RequestBuilderContainer container : builderContainerList) {
-                            ShardReplicationOperationRequestBuilder builder = container.getBuilder();
-                            if (builder instanceof IndexRequestBuilder) {
-                                bulkRequest.add((IndexRequestBuilder) builder);
-                                continue;
-                            }
-                            if (builder instanceof DeleteRequestBuilder) {
-                                bulkRequest.add((DeleteRequestBuilder)builder);
-                                continue;
-                            }
-                        }
+                        flushTimer.time();
+                        metricsFactory.getCounter(IndexBatchBuffer.class,"index.buffer.size").dec(builderContainerList.size());
                         execute();
                     }
                 })
@@ -88,10 +105,12 @@ public class IndexBatchBufferImpl implements IndexBatchBuffer {
     }
 
     public void put(IndexRequestBuilder builder){
+        metricsFactory.getCounter(IndexBatchBuffer.class,"index.buffer.size").inc();
         producer.put(new RequestBuilderContainer(builder));
     }
 
     public void put(DeleteRequestBuilder builder){
+        metricsFactory.getCounter(IndexBatchBuffer.class,"index.buffer.size").inc();
         producer.put(new RequestBuilderContainer(builder));
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7e0e0be8/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
index 4cd3047..736eb9b 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
@@ -201,15 +201,12 @@ public class EntityIndexTest extends BaseIT {
             if(count %1000 == 0){
                 batch.execute();
             }
-
-
-
             if ( ++count > max ) {
                 break;
             }
         }
 
-        batch.execute();
+        batch.executeAndRefresh();
         timer.stop();
         log.info( "Total time to index {} entries {}ms, average {}ms/entry",
                 new Object[] { count, timer.getTime(), timer.getTime() / count } );