You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/03/09 16:14:21 UTC

[04/30] incubator-usergrid git commit: Added ES query and index metrics.

Added ES query and index metrics.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/8d378683
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/8d378683
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/8d378683

Branch: refs/heads/two-dot-o
Commit: 8d37868338b67a6d4e22d96d7316b1b94bc61837
Parents: 00e7ca4
Author: GERey <gr...@apigee.com>
Authored: Wed Mar 4 15:45:57 2015 -0800
Committer: GERey <gr...@apigee.com>
Committed: Wed Mar 4 15:45:57 2015 -0800

----------------------------------------------------------------------
 .../index/impl/EsEntityIndexImpl.java           | 82 ++++++++++++++------
 1 file changed, 59 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8d378683/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index 4200d13..4d4dce9 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -109,6 +109,8 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
 
     private final IndexFig config;
 
+    private final MetricsFactory metricsFactory;
+
 
     //number of times to wait for the index to refresh properly.
     private static final int MAX_WAITS = 10;
@@ -126,9 +128,6 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
 
     private final Timer flushTimer;
 
-    //private final Meter flushMeter;
-
-
 
     @Inject
     public EsEntityIndexImpl( @Assisted final ApplicationScope appScope, final IndexFig config,
@@ -144,6 +143,7 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
         this.alias = indexIdentifier.getAlias();
         this.failureMonitor = new FailureMonitorImpl( config, provider );
         this.aliasCache = indexCache;
+        this.metricsFactory = metricsFactory;
         this.flushTimer = metricsFactory.getTimer( EsEntityIndexImpl.class, "entity.index.flush" );
     }
 
@@ -175,6 +175,7 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
                         .put("action.write_consistency", writeConsistency )
                     .build();
 
+                //Added For Graphite Metrics
                 Timer.Context timeNewIndexCreation = flushTimer.time();
                 final CreateIndexResponse cir = admin.indices().prepareCreate(indexName)
                         .setSettings(settings)
@@ -207,20 +208,28 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
             String[] indexNames = getIndexes(AliasType.Write);
 
             for (String currentIndex : indexNames){
+                //Added For Graphite Metrics
+                Timer.Context timeRemoveAlias = flushTimer.time();
                 isAck = adminClient.indices().prepareAliases().removeAlias(currentIndex,
                         alias.getWriteAlias()).execute().actionGet().isAcknowledged();
-
+                timeRemoveAlias.stop();
                 logger.info("Removed Index Name [{}] from Alias=[{}] ACK=[{}]", currentIndex, alias, isAck);
             }
 
+            //Added For Graphite Metrics
+            Timer.Context timeAddReadAlias = flushTimer.time();
             // add read alias
             isAck = adminClient.indices().prepareAliases().addAlias(
                     indexName, alias.getReadAlias()).execute().actionGet().isAcknowledged();
+            timeAddReadAlias.stop();
             logger.info("Created new read Alias Name [{}] ACK=[{}]", alias.getReadAlias(), isAck);
 
-            // add write alias
+            //Added For Graphite Metrics
+            Timer.Context timeAddWriteAlias = flushTimer.time();
+            //add write alias
             isAck = adminClient.indices().prepareAliases().addAlias(
                     indexName, alias.getWriteAlias()).execute().actionGet().isAcknowledged();
+            timeAddWriteAlias.stop();
             logger.info("Created new write Alias Name [{}] ACK=[{}]", alias.getWriteAlias(), isAck);
 
             aliasCache.invalidate(alias);
@@ -286,12 +295,14 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
         XContentBuilder xcb = IndexingUtils.createDoubleStringIndexMapping(
                 XContentFactory.jsonBuilder(), "_default_");
 
+        //Added For Graphite Metrics
+        Timer.Context timePutIndex = flushTimer.time();
         PutIndexTemplateResponse pitr = esProvider.getClient().admin().indices()
                 .preparePutTemplate("usergrid_template")
                 // set mapping as the default for all types
                 .setTemplate(config.getIndexPrefix() + "*").addMapping( "_default_", xcb )
                 .execute().actionGet();
-
+        timePutIndex.stop();
         if ( !pitr.isAcknowledged() ) {
             throw new IndexException( "Unable to create default mappings" );
         }
@@ -301,7 +312,7 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
     @Override
     public EntityIndexBatch createBatch() {
         EntityIndexBatch batch = new EsEntityIndexBatchImpl(
-                applicationScope, esProvider.getClient(),indexBatchBufferProducer, config, this );
+                applicationScope, esProvider.getClient(),indexBatchBufferProducer, config, this, metricsFactory );
         return batch;
     }
 
@@ -377,7 +388,10 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
             }
 
             try {
+                //Added For Graphite Metrics
+                Timer.Context timeSearch = flushTimer.time();
                 searchResponse = srb.execute().actionGet();
+                timeSearch.stop();
             }
             catch ( Throwable t ) {
                 logger.error( "Unable to communicate with Elasticsearch", t );
@@ -402,7 +416,10 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
                     .prepareSearchScroll(scrollId).setScroll( cursorTimeout + "m" );
 
             try {
+                //Added For Graphite Metrics
+                Timer.Context timeSearchCursor = flushTimer.time();
                 searchResponse = ssrb.execute().actionGet();
+                timeSearchCursor.stop();
             }
             catch ( Throwable t ) {
                 logger.error( "Unable to communicate with elasticsearch", t );
@@ -471,7 +488,10 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
                                 applicationScope.getApplication().getUuid() );
                         return true;
                     }
+                    //Added For Graphite Metrics
+                    Timer.Context timeRefreshIndex = flushTimer.time();
                     esProvider.getClient().admin().indices().prepareRefresh( indexes ).execute().actionGet();
+                    timeRefreshIndex.stop();
                     logger.debug("Refreshed indexes: {}", StringUtils.join(indexes, ", "));
                     return true;
                 }
@@ -514,7 +534,10 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
 
         final SearchResponse searchResponse;
         try {
+            //Added For Graphite Metrics
+            Timer.Context timeEntityIndex = flushTimer.time();
             searchResponse = srb.execute().actionGet();
+            timeEntityIndex.stop();
         }
         catch ( Throwable t ) {
             logger.error( "Unable to communicate with elasticsearch" );
@@ -536,22 +559,28 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
 
         final TermQueryBuilder tqb = QueryBuilders.termQuery( ENTITYID_ID_FIELDNAME, idString );
 
+        //Added For Graphite Metrics
+        final Timer.Context timeDeleteAllVersions = flushTimer.time();
         final ListenableActionFuture<DeleteByQueryResponse> response = esProvider.getClient()
             .prepareDeleteByQuery( alias.getWriteAlias() ).setQuery( tqb ).execute();
 
-        response.addListener(new ActionListener<DeleteByQueryResponse>() {
+        response.addListener( new ActionListener<DeleteByQueryResponse>() {
+
             @Override
-            public void onResponse(DeleteByQueryResponse response) {
-                logger.debug( "Deleted entity {}:{} from all index scopes with response status = {}",
-                    entityId.getType(), entityId.getUuid(), response.status().toString());
+            public void onResponse( DeleteByQueryResponse response ) {
+                timeDeleteAllVersions.stop();
+                logger
+                    .debug( "Deleted entity {}:{} from all index scopes with response status = {}", entityId.getType(),
+                        entityId.getUuid(), response.status().toString() );
 
                 checkDeleteByQueryResponse( tqb, response );
             }
 
+
             @Override
-            public void onFailure(Throwable e) {
-                logger.error("Deleted entity {}:{} from all index scopes with error {}",
-                    entityId.getType(), entityId.getUuid(), e);
+            public void onFailure( Throwable e ) {
+                logger.error( "Deleted entity {}:{} from all index scopes with error {}", entityId.getType(),
+                    entityId.getUuid(), e);
 
 
             }
@@ -570,26 +599,33 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
                 FilterBuilders.rangeFilter(ENTITY_VERSION_FIELDNAME).lt(version.timestamp())
         );
 
+        //Added For Graphite Metrics
+        //Checks the time from the execute to the response below
+        final Timer.Context timeDeletePreviousVersions = flushTimer.time();
         final ListenableActionFuture<DeleteByQueryResponse> response = esProvider.getClient()
             .prepareDeleteByQuery(alias.getWriteAlias()).setQuery(fqb).execute();
 
-        response.addListener(new ActionListener<DeleteByQueryResponse>() {
+        //Added For Graphite Metrics
+        response.addListener( new ActionListener<DeleteByQueryResponse>() {
             @Override
-            public void onResponse(DeleteByQueryResponse response) {
+            public void onResponse( DeleteByQueryResponse response ) {
+                timeDeletePreviousVersions.stop();
                 //error message needs to be retooled so that it describes the entity more throughly
-                logger.debug( "Deleted entity {}:{} with version {} from all "
-                        + "index scopes with response status = {}",
-                    entityId.getType(), entityId.getUuid(), version,  response.status().toString()  );
+                logger
+                    .debug( "Deleted entity {}:{} with version {} from all " + "index scopes with response status = {}",
+                        entityId.getType(), entityId.getUuid(), version, response.status().toString() );
 
                 checkDeleteByQueryResponse( fqb, response );
             }
 
+
             @Override
-            public void onFailure(Throwable e) {
-                logger.error("Deleted entity {}:{} from all index scopes with error {}",
-                    entityId.getType(), entityId.getUuid(), e);
+            public void onFailure( Throwable e ) {
+                logger.error( "Deleted entity {}:{} from all index scopes with error {}", entityId.getType(),
+                    entityId.getUuid(), e );
             }
-        });
+        } );
+
 
         return response;
     }