You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/11/02 23:56:13 UTC
[22/50] [abbrv] usergrid git commit: -Convert search in ES to use
term queries instead of scroll with filters. -Fix delete so we remove
connections from Elasticsearch as well as the entity on entity delete event.
-Convert search in ES to use term queries instead of scroll with filters.
-Fix delete so we remove connections from Elasticsearch as well as the entity on entity delete event.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/22beca2c
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/22beca2c
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/22beca2c
Branch: refs/heads/USERGRID-909
Commit: 22beca2cb3714833ddf6a748522c6bead17fe9f2
Parents: 2686054
Author: Michael Russo <mi...@gmail.com>
Authored: Wed Oct 28 15:58:41 2015 -0700
Committer: Michael Russo <mi...@gmail.com>
Committed: Wed Oct 28 15:58:41 2015 -0700
----------------------------------------------------------------------
.../corepersistence/index/IndexServiceImpl.java | 2 +-
.../persistence/index/CandidateResult.java | 11 +-
.../persistence/index/EntityIndexBatch.java | 2 +-
.../usergrid/persistence/index/IndexFig.java | 6 +
.../index/impl/DeIndexOperation.java | 5 +
.../index/impl/EsEntityIndexBatchImpl.java | 34 ++++-
.../index/impl/EsEntityIndexImpl.java | 149 +++++++++++--------
.../persistence/index/impl/IndexingUtils.java | 2 +-
8 files changed, 137 insertions(+), 74 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/22beca2c/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
index ccb6221..b2a1a2a 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
@@ -219,7 +219,7 @@ public class IndexServiceImpl implements IndexService {
//collect results into a single batch
.collect( () -> ei.createBatch(), ( batch, candidateResult ) -> {
logger.debug( "Deindexing on edge {} for entity {} added to batch",searchEdge , entityId );
- batch.deindex( searchEdge, candidateResult );
+ batch.deindex( candidateResult );
} )
//return the future from the batch execution
.flatMap( batch ->Observable.just(batch.build()) );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/22beca2c/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/CandidateResult.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/CandidateResult.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/CandidateResult.java
index 2375509..9b1898c 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/CandidateResult.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/CandidateResult.java
@@ -32,10 +32,12 @@ import org.apache.usergrid.persistence.model.entity.Id;
public class CandidateResult implements EntityVersion {
private final Id entityId;
private final UUID entityVersion;
+ private final String docId;
- public CandidateResult( Id entityId, UUID entityVersion ) {
+ public CandidateResult( Id entityId, UUID entityVersion, String docId ) {
this.entityId = entityId;
this.entityVersion = entityVersion;
+ this.docId = docId;
}
@Override
@@ -48,6 +50,10 @@ public class CandidateResult implements EntityVersion {
return entityId;
}
+ public String getDocId() {
+ return docId;
+ }
+
@Override
public boolean equals( final Object o ) {
@@ -66,6 +72,9 @@ public class CandidateResult implements EntityVersion {
if ( !entityVersion.equals( that.entityVersion ) ) {
return false;
}
+ if ( !docId.equals( that.docId ) ) {
+ return false;
+ }
return true;
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/22beca2c/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
index 98652c1..17dd4d3 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
@@ -63,7 +63,7 @@ public interface EntityIndexBatch {
EntityIndexBatch deindex( final SearchEdge searchEdge, final Id id, final UUID version );
-
+ EntityIndexBatch deindex( final CandidateResult candidateResult);
/**
* get the batches
http://git-wip-us.apache.org/repos/asf/usergrid/blob/22beca2c/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
index e093d7d..33f199a 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
@@ -60,6 +60,8 @@ public interface IndexFig extends GuicyFig {
String ELASTICSEARCH_CLIENT_TYPE = "elasticsearch.client.type";
+ String ELASTICSEARCH_VERSION_QUERY_LIMIT = "elasticsearch.version_query_limit";
+
/**
* Comma-separated list of Elasticsearch hosts.
@@ -199,4 +201,8 @@ public interface IndexFig extends GuicyFig {
@Default("1000")
@Key( "elasticsearch_queue_error_sleep_ms" )
long getSleepTimeForQueueError();
+
+ @Default("1000")
+ @Key( ELASTICSEARCH_VERSION_QUERY_LIMIT )
+ int getVersionQueryLimit();
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/22beca2c/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexOperation.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexOperation.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexOperation.java
index 4060dac..dbecf8a 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexOperation.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexOperation.java
@@ -61,6 +61,11 @@ public class DeIndexOperation implements BatchOperation {
this.documentId = createIndexDocId( applicationScope, id, version, searchEdge );
}
+ public DeIndexOperation( String[] indexes, String docId) {
+ this.indexes = indexes;
+ this.documentId = docId;
+ }
+
@Override
public void doOperation( final Client client, final BulkRequestBuilder bulkRequest ) {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/22beca2c/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index 68830ca..a8fb751 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@ -84,23 +84,42 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
@Override
public EntityIndexBatch deindex( final SearchEdge searchEdge, final Id id, final UUID version ) {
- IndexValidationUtils.validateSearchEdge( searchEdge );
- ValidationUtils.verifyIdentity( id );
+ IndexValidationUtils.validateSearchEdge(searchEdge);
+ ValidationUtils.verifyIdentity(id);
ValidationUtils.verifyVersion( version );
String[] indexes = entityIndex.getIndexes();
//get the default index if no alias exists yet
if ( indexes == null || indexes.length == 0 ) {
- throw new IllegalStateException("No indexes exist for " + indexLocationStrategy.getAlias().getWriteAlias());
+ throw new IllegalStateException("No indexes exist for " + indexLocationStrategy.getAlias().getWriteAlias());
}
if ( log.isDebugEnabled() ) {
log.debug( "Deindexing to indexes {} with scope {} on edge {} with id {} and version {} ",
- new Object[] { indexes, applicationScope, searchEdge, id, version } );
+ new Object[] { indexes, applicationScope, searchEdge, id, version } );
}
- container.addDeIndexRequest( new DeIndexOperation( indexes, applicationScope, searchEdge, id, version ) );
+ container.addDeIndexRequest(new DeIndexOperation(indexes, applicationScope, searchEdge, id, version));
+
+ return this;
+ }
+
+ public EntityIndexBatch deindexWithDocId( final String docId ) {
+
+ String[] indexes = entityIndex.getIndexes();
+ //get the default index if no alias exists yet
+ if ( indexes == null || indexes.length == 0 ) {
+ throw new IllegalStateException("No indexes exist for " + indexLocationStrategy.getAlias().getWriteAlias());
+ }
+
+ if ( log.isDebugEnabled() ) {
+ log.debug( "Deindexing to indexes {} with with documentId {} ",
+ new Object[] { indexes, docId } );
+ }
+
+
+ container.addDeIndexRequest( new DeIndexOperation( indexes, docId ) );
return this;
}
@@ -117,6 +136,11 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
return deindex( searchEdge, entity.getId(), entity.getVersion() );
}
+ @Override
+ public EntityIndexBatch deindex( final CandidateResult entity ) {
+
+ return deindexWithDocId(entity.getDocId());
+ }
@Override
http://git-wip-us.apache.org/repos/asf/usergrid/blob/22beca2c/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index 9be591e..0b7f2d7 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -65,6 +65,7 @@ import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.metrics.sum.Sum;
import org.elasticsearch.search.aggregations.metrics.sum.SumBuilder;
+import org.elasticsearch.search.sort.SortOrder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -389,7 +390,7 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData {
final ParsedQuery parsedQuery = ParsedQueryBuilder.build(query);
final SearchRequestBuilder srb = searchRequest.getBuilder( searchEdge, searchTypes, parsedQuery, limit, offset )
- .setTimeout( TimeValue.timeValueMillis( queryTimeout ) );
+ .setTimeout(TimeValue.timeValueMillis(queryTimeout));
if ( logger.isDebugEnabled() ) {
logger.debug( "Searching index (read alias): {}\n nodeId: {}, edgeType: {}, \n type: {}\n query: {} ",
@@ -427,57 +428,62 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData {
*/
IndexValidationUtils.validateSearchEdge(edge);
- Preconditions.checkNotNull( entityId, "entityId cannot be null" );
+ Preconditions.checkNotNull(entityId, "entityId cannot be null");
SearchResponse searchResponse;
-
List<CandidateResult> candidates = new ArrayList<>();
- final ParsedQuery parsedQuery = ParsedQueryBuilder.build( "select *" );
-
- final SearchRequestBuilder srb = searchRequestBuilderStrategyV2.getBuilder();
-
- //I can't just search on the entity Id.
+ //never let the limit be less than 2 as there are potential indefinite paging issues
+ final int searchLimit = Math.max(2, indexFig.getVersionQueryLimit());
- FilterBuilder entityEdgeFilter = FilterBuilders.termFilter(IndexingUtils.EDGE_NODE_ID_FIELDNAME,
- IndexingUtils.nodeId(edge.getNodeId()));
+ final QueryBuilder entityQuery = QueryBuilders
+ .termQuery(IndexingUtils.EDGE_NODE_ID_FIELDNAME, IndexingUtils.nodeId(edge.getNodeId()));
- srb.setPostFilter(entityEdgeFilter);
+ final SearchRequestBuilder srb = searchRequestBuilderStrategyV2.getBuilder();
if ( logger.isDebugEnabled() ) {
- logger.debug( "Searching for marked versions in index (read alias): {}\n nodeId: {},\n query: {} ",
+ logger.debug( "Searching for edges in (read alias): {}\n nodeId: {},\n query: {} ",
this.alias.getReadAlias(),entityId, srb );
}
try {
- //Added For Graphite Metrics
-
- //set the timeout on the scroll cursor to 6 seconds and set the number of values returned per shard to 100.
- //The settings for the scroll aren't tested and so we aren't sure what vlaues would be best in a production enviroment
- //TODO: review this and make them not magic numbers when acking this PR.
- searchResponse = srb.setScroll( new TimeValue( 6000 ) ).setSize( 100 ).execute().actionGet();
+ long queryTimestamp = 0L;
while(true){
- //add search result hits to some sort of running tally of hits.
- candidates = aggregateScrollResults( candidates, searchResponse, null );
- SearchScrollRequestBuilder ssrb = searchRequestBuilderStrategyV2
- .getScrollBuilder( searchResponse.getScrollId() )
- .setScroll( new TimeValue( 6000 ) );
+ QueryBuilder timestampQuery = QueryBuilders
+ .rangeQuery(IndexingUtils.EDGE_TIMESTAMP_FIELDNAME)
+ .gte(queryTimestamp);
- //TODO: figure out how to log exactly what we're putting into elasticsearch
- // if ( logger.isDebugEnabled() ) {
- // logger.debug( "Scroll search using query: {} ",
- // ssrb.toString() );
- // }
+ QueryBuilder finalQuery = QueryBuilders
+ .boolQuery()
+ .must(entityQuery)
+ .must(timestampQuery);
- searchResponse = ssrb.execute().actionGet();
+ searchResponse = srb
+ .setQuery(finalQuery)
+ .setSize(searchLimit)
+ .addSort(IndexingUtils.EDGE_TIMESTAMP_FIELDNAME, SortOrder.ASC)
+ .execute()
+ .actionGet();
- if (searchResponse.getHits().getHits().length == 0) {
+ int responseSize = searchResponse.getHits().getHits().length;
+ if(responseSize == 0){
break;
}
+ // update queryTimestamp to be the timestamp of the last entity returned from the query
+ queryTimestamp = (long) searchResponse
+ .getHits().getAt(responseSize - 1)
+ .getSource().get(IndexingUtils.EDGE_TIMESTAMP_FIELDNAME);
+
+ candidates = aggregateScrollResults(candidates, searchResponse, null);
+
+ if(responseSize < searchLimit){
+
+ break;
+ }
}
}
@@ -488,7 +494,7 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData {
}
failureMonitor.success();
- return new CandidateResults( candidates, parsedQuery.getSelectFieldMappings());
+ return new CandidateResults( candidates, Collections.EMPTY_SET);
}
@@ -496,61 +502,74 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData {
public CandidateResults getAllEntityVersionsBeforeMarkedVersion( final Id entityId, final UUID markedVersion ) {
Preconditions.checkNotNull( entityId, "entityId cannot be null" );
- Preconditions.checkNotNull( markedVersion, "markedVersion cannot be null" );
+ Preconditions.checkNotNull(markedVersion, "markedVersion cannot be null");
ValidationUtils.verifyVersion(markedVersion);
SearchResponse searchResponse;
-
List<CandidateResult> candidates = new ArrayList<>();
- final ParsedQuery parsedQuery = ParsedQueryBuilder.build( "select *" );
+ final long markedTimestamp = markedVersion.timestamp();
- final SearchRequestBuilder srb = searchRequestBuilderStrategyV2.getBuilder();
-
- FilterBuilder entityIdFilter = FilterBuilders.termFilter(IndexingUtils.ENTITY_ID_FIELDNAME,
- IndexingUtils.entityId(entityId));
+ // never let the limit be less than 2 as there are potential indefinite paging issues
+ final int searchLimit = Math.max(2, indexFig.getVersionQueryLimit());
- srb.setPostFilter(entityIdFilter);
+ // this query will find the document for the entity itself
+ final QueryBuilder entityQuery = QueryBuilders
+ .termQuery(IndexingUtils.ENTITY_ID_FIELDNAME, IndexingUtils.entityId(entityId));
+ // this query will find all the documents where this entity is a source/target node
+ final QueryBuilder nodeQuery = QueryBuilders
+ .termQuery(IndexingUtils.EDGE_NODE_ID_FIELDNAME, IndexingUtils.nodeId(entityId));
-
- if ( logger.isDebugEnabled() ) {
- logger.debug( "Searching for marked versions in index (read alias): {}\n nodeId: {},\n query: {} ",
- this.alias.getReadAlias(),entityId, srb );
- }
+ final SearchRequestBuilder srb = searchRequestBuilderStrategyV2.getBuilder();
try {
- //Added For Graphite Metrics
- //set the timeout on the scroll cursor to 6 seconds and set the number of values returned per shard to 100.
- //The settings for the scroll aren't tested and so we aren't sure what vlaues would be best in a production enviroment
- //TODO: review this and make them not magic numbers when acking this PR.
- searchResponse = srb.setScroll( new TimeValue( 6000 ) ).setSize( 100 ).execute().actionGet();
+ long queryTimestamp = 0L;
- //list that will hold all of the search hits
+ while(true){
+ QueryBuilder timestampQuery = QueryBuilders
+ .rangeQuery(IndexingUtils.EDGE_TIMESTAMP_FIELDNAME)
+ .gte(queryTimestamp)
+ .lte(markedTimestamp);
+
+ QueryBuilder entityQueryWithTimestamp = QueryBuilders
+ .boolQuery()
+ .must(entityQuery)
+ .must(timestampQuery);
+
+ QueryBuilder finalQuery = QueryBuilders
+ .boolQuery()
+ .should(entityQueryWithTimestamp)
+ .should(nodeQuery)
+ .minimumNumberShouldMatch(1);
+
+ searchResponse = srb
+ .setQuery(finalQuery)
+ .setSize(searchLimit)
+ .addSort(IndexingUtils.EDGE_TIMESTAMP_FIELDNAME, SortOrder.ASC)
+ .execute()
+ .actionGet();
- while(true){
- //add search result hits to some sort of running tally of hits.
- candidates = aggregateScrollResults( candidates, searchResponse, markedVersion);
+ int responseSize = searchResponse.getHits().getHits().length;
+ if(responseSize == 0){
+ break;
+ }
+
+ candidates = aggregateScrollResults(candidates, searchResponse, markedVersion);
- SearchScrollRequestBuilder ssrb = searchRequestBuilderStrategyV2
- .getScrollBuilder( searchResponse.getScrollId() )
- .setScroll( new TimeValue( 6000 ) );
+ // update queryTimestamp to be the timestamp of the last entity returned from the query
+ queryTimestamp = (long) searchResponse
+ .getHits().getAt(responseSize - 1)
+ .getSource().get(IndexingUtils.EDGE_TIMESTAMP_FIELDNAME);
- //TODO: figure out how to log exactly what we're putting into elasticsearch
-// if ( logger.isDebugEnabled() ) {
-// logger.debug( "Scroll search using query: {} ",
-// ssrb.toString() );
-// }
- searchResponse = ssrb.execute().actionGet();
+ if(responseSize < searchLimit){
- if (searchResponse.getHits().getHits().length == 0) {
break;
}
-
}
}
catch ( Throwable t ) {
@@ -560,7 +579,7 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData {
}
failureMonitor.success();
- return new CandidateResults( candidates, parsedQuery.getSelectFieldMappings());
+ return new CandidateResults( candidates, Collections.EMPTY_SET);
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/22beca2c/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java
index 9e06fa6..18cb928 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java
@@ -241,7 +241,7 @@ public class IndexingUtils {
Id entityId = new SimpleId( UUID.fromString( entityUUID ), entityType );
- return new CandidateResult( entityId, UUID.fromString( versionUUID ) );
+ return new CandidateResult( entityId, UUID.fromString( versionUUID ), documentId );
}