You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/05/27 19:28:47 UTC
[09/19] incubator-usergrid git commit: [USERGRID-607] Refactored
getAllEdgeDocuments to use scroll api and to do a simplified edge search.
Refactored deleteIndexEdge to take advantage of the scroll api.
[USERGRID-607] Refactored getAllEdgeDocuments to use scroll api and to do a simplified edge search.
Refactored deleteIndexEdge to take advantage of the scroll api.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/aa8b20d9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/aa8b20d9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/aa8b20d9
Branch: refs/heads/two-dot-o-dev
Commit: aa8b20d9ae33cddff289aa7652ee7e4fe88c9b0e
Parents: 00aa293
Author: GERey <gr...@apigee.com>
Authored: Fri May 22 14:37:56 2015 -0700
Committer: GERey <gr...@apigee.com>
Committed: Fri May 22 14:37:56 2015 -0700
----------------------------------------------------------------------
.../corepersistence/index/IndexServiceImpl.java | 27 ++++------
.../index/ApplicationEntityIndex.java | 4 +-
.../impl/EsApplicationEntityIndexImpl.java | 56 +++++++++++++++-----
3 files changed, 53 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa8b20d9/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
index 5e2a5ea..908bd3d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
@@ -187,32 +187,23 @@ public class IndexServiceImpl implements IndexService {
final IndexEdge fromSource = generateScopeFromSource( edge );
final Id targetId = edge.getTargetNode();
+ CandidateResults targetEdgesToBeDeindexed = ei.getAllEdgeDocuments( fromSource, targetId );
- CandidateResults targetEdgesToBeDeindexed = ei.getAllEdgeDocuments( fromSource, targetId, 1000, 0 );
- //Should loop thorugh and query for all documents and if there are no documents then the loop should
- // exit.
- do {
- batch = deindexBatchIteratorResolver( fromSource, targetEdgesToBeDeindexed, batch );
- if ( !targetEdgesToBeDeindexed.getOffset().isPresent() ) break;
- targetEdgesToBeDeindexed = ei.getAllEdgeDocuments( fromSource, targetId, 1000,
- targetEdgesToBeDeindexed.getOffset().get() );
- }
- while ( !targetEdgesToBeDeindexed.isEmpty() );
+ //1. Feed the observable the candidate results you got back. Since it now does the aggregation for you
+ // you don't need to worry about putting your code in a do while.
+
+
+ batch = deindexBatchIteratorResolver( fromSource, targetEdgesToBeDeindexed, batch );
+
final IndexEdge fromTarget = generateScopeFromTarget( edge );
final Id sourceId = edge.getSourceNode();
- CandidateResults sourceEdgesToBeDeindexed = ei.getAllEdgeDocuments( fromTarget, sourceId, 1000, 0 );
+ CandidateResults sourceEdgesToBeDeindexed = ei.getAllEdgeDocuments( fromTarget, sourceId );
- do {
- batch = deindexBatchIteratorResolver( fromTarget, sourceEdgesToBeDeindexed, batch );
- if ( !sourceEdgesToBeDeindexed.getOffset().isPresent() ) break;
- sourceEdgesToBeDeindexed = ei.getAllEdgeDocuments( fromTarget, sourceId, 1000,
- sourceEdgesToBeDeindexed.getOffset().get() );
- }
- while ( !sourceEdgesToBeDeindexed.isEmpty() );
+ batch = deindexBatchIteratorResolver( fromTarget, sourceEdgesToBeDeindexed, batch );
return batch.execute();
} );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa8b20d9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java
index 9ce65e9..b392f3c 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java
@@ -54,11 +54,9 @@ public interface ApplicationEntityIndex {
* Same as search, just iterates all documents that match the index edge exactly.
* @param edge The edge to search on
* @param entityId The entity that the searchEdge is connected to.
- * @param limit The limit of the values to return per search.
- * @param offset The offset to page the query on.
* @return
*/
- CandidateResults getAllEdgeDocuments(final IndexEdge edge, final Id entityId, final int limit, final int offset);
+ CandidateResults getAllEdgeDocuments( final IndexEdge edge, final Id entityId );
/**
* Returns all entity documents that match the entityId and come before the marked version
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa8b20d9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
index 99e5525..347a7b9 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
@@ -185,8 +185,7 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex {
@Override
- public CandidateResults getAllEdgeDocuments( final IndexEdge edge, final Id entityId, final int limit,
- final int offset ) {
+ public CandidateResults getAllEdgeDocuments( final IndexEdge edge, final Id entityId ) {
/**
* Take a list of IndexEdge, with an entityId
and query Es directly for matches
@@ -194,26 +193,59 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex {
*/
IndexValidationUtils.validateSearchEdge( edge );
Preconditions.checkNotNull( entityId, "entityId cannot be null" );
- Preconditions.checkArgument( limit > 0, "limit must be > 0" );
SearchResponse searchResponse;
+ List<CandidateResult> candidates = new ArrayList<>();
+
final ParsedQuery parsedQuery = ParsedQueryBuilder.build( "select *" );
- FilterBuilders.idsFilter( entityId.getType() );
- final SearchRequestBuilder srb = searchRequest.getBuilder( edge, SearchTypes.fromTypes( entityId.getType() ),
- parsedQuery, limit, offset ).setTimeout( TimeValue.timeValueMillis( queryTimeout ) );
+ final SearchRequestBuilder srb = searchRequestBuilderStrategyV2.getBuilder();
+
+ //I can't just search on the entity Id.
+
+ FilterBuilder entityEdgeFilter = FilterBuilders.termFilter( IndexingUtils.EDGE_NODE_ID_FIELDNAME,
+ IndexingUtils.idString( edge.getNodeId() ));
+
+ srb.setPostFilter(entityEdgeFilter);
if ( logger.isDebugEnabled() ) {
- logger.debug( "Searching for edge index (read alias): {}\n nodeId: {}, edgeType: {}, \n type: {}\n query: {} ",
- this.alias.getReadAlias(), edge.getNodeId(), edge.getEdgeName(),
- SearchTypes.fromTypes( entityId.getType() ), srb );
+ logger.debug( "Searching for marked versions in index (read alias): {}\n nodeId: {},\n query: {} ",
+ this.alias.getReadAlias(),entityId, srb );
}
try {
//Added For Graphite Metrics
Timer.Context timeSearch = searchTimer.time();
- searchResponse = srb.execute().actionGet();
+
+ //set the timeout on the scroll cursor to 6 seconds and set the number of values returned per shard to 100.
+ //The settings for the scroll aren't tested and so we aren't sure what vlaues would be best in a production enviroment
+ //TODO: review this and make them not magic numbers when acking this PR.
+ searchResponse = srb.setScroll( new TimeValue( 6000 ) ).setSize( 100 ).execute().actionGet();
+
+
+ while(true){
+ //add search result hits to some sort of running tally of hits.
+ candidates = aggregateScrollResults( candidates, searchResponse );
+
+ SearchScrollRequestBuilder ssrb = searchRequestBuilderStrategyV2
+ .getScrollBuilder( searchResponse.getScrollId() )
+ .setScroll( new TimeValue( 6000 ) );
+
+ //TODO: figure out how to log exactly what we're putting into elasticsearch
+ // if ( logger.isDebugEnabled() ) {
+ // logger.debug( "Scroll search using query: {} ",
+ // ssrb.toString() );
+ // }
+
+ searchResponse = ssrb.execute().actionGet();
+
+ if (searchResponse.getHits().getHits().length == 0) {
+ break;
+ }
+
+
+ }
timeSearch.stop();
}
catch ( Throwable t ) {
@@ -223,7 +255,7 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex {
}
failureMonitor.success();
- return parseResults(searchResponse, parsedQuery, limit, offset);
+ return new CandidateResults( candidates, parsedQuery.getSelectFieldMappings());
}
@@ -264,8 +296,6 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex {
//Added For Graphite Metrics
Timer.Context timeSearch = searchTimer.time();
- //REfactor this out and have it return a modified parseResults that will create the candidateResults from
- //the hit results and then keep that
//set the timeout on the scroll cursor to 6 seconds and set the number of values returned per shard to 100.
//The settings for the scroll aren't tested and so we aren't sure what vlaues would be best in a production enviroment
//TODO: review this and make them not magic numbers when acking this PR.