You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/03/07 02:05:31 UTC
[04/20] incubator-usergrid git commit: Added ES query and index
metrics.
Added ES query and index metrics.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/8d378683
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/8d378683
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/8d378683
Branch: refs/heads/USERGRID-416
Commit: 8d37868338b67a6d4e22d96d7316b1b94bc61837
Parents: 00e7ca4
Author: GERey <gr...@apigee.com>
Authored: Wed Mar 4 15:45:57 2015 -0800
Committer: GERey <gr...@apigee.com>
Committed: Wed Mar 4 15:45:57 2015 -0800
----------------------------------------------------------------------
.../index/impl/EsEntityIndexImpl.java | 82 ++++++++++++++------
1 file changed, 59 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8d378683/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index 4200d13..4d4dce9 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -109,6 +109,8 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
private final IndexFig config;
+ private final MetricsFactory metricsFactory;
+
//number of times to wait for the index to refresh properly.
private static final int MAX_WAITS = 10;
@@ -126,9 +128,6 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
private final Timer flushTimer;
- //private final Meter flushMeter;
-
-
@Inject
public EsEntityIndexImpl( @Assisted final ApplicationScope appScope, final IndexFig config,
@@ -144,6 +143,7 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
this.alias = indexIdentifier.getAlias();
this.failureMonitor = new FailureMonitorImpl( config, provider );
this.aliasCache = indexCache;
+ this.metricsFactory = metricsFactory;
this.flushTimer = metricsFactory.getTimer( EsEntityIndexImpl.class, "entity.index.flush" );
}
@@ -175,6 +175,7 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
.put("action.write_consistency", writeConsistency )
.build();
+ //Added For Graphite Metrics
Timer.Context timeNewIndexCreation = flushTimer.time();
final CreateIndexResponse cir = admin.indices().prepareCreate(indexName)
.setSettings(settings)
@@ -207,20 +208,28 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
String[] indexNames = getIndexes(AliasType.Write);
for (String currentIndex : indexNames){
+ //Added For Graphite Metrics
+ Timer.Context timeRemoveAlias = flushTimer.time();
isAck = adminClient.indices().prepareAliases().removeAlias(currentIndex,
alias.getWriteAlias()).execute().actionGet().isAcknowledged();
-
+ timeRemoveAlias.stop();
logger.info("Removed Index Name [{}] from Alias=[{}] ACK=[{}]", currentIndex, alias, isAck);
}
+ //Added For Graphite Metrics
+ Timer.Context timeAddReadAlias = flushTimer.time();
// add read alias
isAck = adminClient.indices().prepareAliases().addAlias(
indexName, alias.getReadAlias()).execute().actionGet().isAcknowledged();
+ timeAddReadAlias.stop();
logger.info("Created new read Alias Name [{}] ACK=[{}]", alias.getReadAlias(), isAck);
- // add write alias
+ //Added For Graphite Metrics
+ Timer.Context timeAddWriteAlias = flushTimer.time();
+ //add write alias
isAck = adminClient.indices().prepareAliases().addAlias(
indexName, alias.getWriteAlias()).execute().actionGet().isAcknowledged();
+ timeAddWriteAlias.stop();
logger.info("Created new write Alias Name [{}] ACK=[{}]", alias.getWriteAlias(), isAck);
aliasCache.invalidate(alias);
@@ -286,12 +295,14 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
XContentBuilder xcb = IndexingUtils.createDoubleStringIndexMapping(
XContentFactory.jsonBuilder(), "_default_");
+ //Added For Graphite Metrics
+ Timer.Context timePutIndex = flushTimer.time();
PutIndexTemplateResponse pitr = esProvider.getClient().admin().indices()
.preparePutTemplate("usergrid_template")
// set mapping as the default for all types
.setTemplate(config.getIndexPrefix() + "*").addMapping( "_default_", xcb )
.execute().actionGet();
-
+ timePutIndex.stop();
if ( !pitr.isAcknowledged() ) {
throw new IndexException( "Unable to create default mappings" );
}
@@ -301,7 +312,7 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
@Override
public EntityIndexBatch createBatch() {
EntityIndexBatch batch = new EsEntityIndexBatchImpl(
- applicationScope, esProvider.getClient(),indexBatchBufferProducer, config, this );
+ applicationScope, esProvider.getClient(),indexBatchBufferProducer, config, this, metricsFactory );
return batch;
}
@@ -377,7 +388,10 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
}
try {
+ //Added For Graphite Metrics
+ Timer.Context timeSearch = flushTimer.time();
searchResponse = srb.execute().actionGet();
+ timeSearch.stop();
}
catch ( Throwable t ) {
logger.error( "Unable to communicate with Elasticsearch", t );
@@ -402,7 +416,10 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
.prepareSearchScroll(scrollId).setScroll( cursorTimeout + "m" );
try {
+ //Added For Graphite Metrics
+ Timer.Context timeSearchCursor = flushTimer.time();
searchResponse = ssrb.execute().actionGet();
+ timeSearchCursor.stop();
}
catch ( Throwable t ) {
logger.error( "Unable to communicate with elasticsearch", t );
@@ -471,7 +488,10 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
applicationScope.getApplication().getUuid() );
return true;
}
+ //Added For Graphite Metrics
+ Timer.Context timeRefreshIndex = flushTimer.time();
esProvider.getClient().admin().indices().prepareRefresh( indexes ).execute().actionGet();
+ timeRefreshIndex.stop();
logger.debug("Refreshed indexes: {}", StringUtils.join(indexes, ", "));
return true;
}
@@ -514,7 +534,10 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
final SearchResponse searchResponse;
try {
+ //Added For Graphite Metrics
+ Timer.Context timeEntityIndex = flushTimer.time();
searchResponse = srb.execute().actionGet();
+ timeEntityIndex.stop();
}
catch ( Throwable t ) {
logger.error( "Unable to communicate with elasticsearch" );
@@ -536,22 +559,28 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
final TermQueryBuilder tqb = QueryBuilders.termQuery( ENTITYID_ID_FIELDNAME, idString );
+ //Added For Graphite Metrics
+ final Timer.Context timeDeleteAllVersions = flushTimer.time();
final ListenableActionFuture<DeleteByQueryResponse> response = esProvider.getClient()
.prepareDeleteByQuery( alias.getWriteAlias() ).setQuery( tqb ).execute();
- response.addListener(new ActionListener<DeleteByQueryResponse>() {
+ response.addListener( new ActionListener<DeleteByQueryResponse>() {
+
@Override
- public void onResponse(DeleteByQueryResponse response) {
- logger.debug( "Deleted entity {}:{} from all index scopes with response status = {}",
- entityId.getType(), entityId.getUuid(), response.status().toString());
+ public void onResponse( DeleteByQueryResponse response ) {
+ timeDeleteAllVersions.stop();
+ logger
+ .debug( "Deleted entity {}:{} from all index scopes with response status = {}", entityId.getType(),
+ entityId.getUuid(), response.status().toString() );
checkDeleteByQueryResponse( tqb, response );
}
+
@Override
- public void onFailure(Throwable e) {
- logger.error("Deleted entity {}:{} from all index scopes with error {}",
- entityId.getType(), entityId.getUuid(), e);
+ public void onFailure( Throwable e ) {
+ logger.error( "Deleted entity {}:{} from all index scopes with error {}", entityId.getType(),
+ entityId.getUuid(), e);
}
@@ -570,26 +599,33 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
FilterBuilders.rangeFilter(ENTITY_VERSION_FIELDNAME).lt(version.timestamp())
);
+ //Added For Graphite Metrics
+ //Checks the time from the execute to the response below
+ final Timer.Context timeDeletePreviousVersions = flushTimer.time();
final ListenableActionFuture<DeleteByQueryResponse> response = esProvider.getClient()
.prepareDeleteByQuery(alias.getWriteAlias()).setQuery(fqb).execute();
- response.addListener(new ActionListener<DeleteByQueryResponse>() {
+ //Added For Graphite Metrics
+ response.addListener( new ActionListener<DeleteByQueryResponse>() {
@Override
- public void onResponse(DeleteByQueryResponse response) {
+ public void onResponse( DeleteByQueryResponse response ) {
+ timeDeletePreviousVersions.stop();
//error message needs to be retooled so that it describes the entity more throughly
- logger.debug( "Deleted entity {}:{} with version {} from all "
- + "index scopes with response status = {}",
- entityId.getType(), entityId.getUuid(), version, response.status().toString() );
+ logger
+ .debug( "Deleted entity {}:{} with version {} from all " + "index scopes with response status = {}",
+ entityId.getType(), entityId.getUuid(), version, response.status().toString() );
checkDeleteByQueryResponse( fqb, response );
}
+
@Override
- public void onFailure(Throwable e) {
- logger.error("Deleted entity {}:{} from all index scopes with error {}",
- entityId.getType(), entityId.getUuid(), e);
+ public void onFailure( Throwable e ) {
+ logger.error( "Deleted entity {}:{} from all index scopes with error {}", entityId.getType(),
+ entityId.getUuid(), e );
}
- });
+ } );
+
return response;
}