You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/05/27 19:28:47 UTC

[09/19] incubator-usergrid git commit: [USERGRID-607] Refactored getAllEdgeDocuments to use scroll api and to do a simplified edge search. Refactored deleteIndexEdge to take advantage of the scroll api.

[USERGRID-607] Refactored getAllEdgeDocuments to use scroll api and to do a simplified edge search.
Refactored deleteIndexEdge to take advantage of the scroll api.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/aa8b20d9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/aa8b20d9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/aa8b20d9

Branch: refs/heads/two-dot-o-dev
Commit: aa8b20d9ae33cddff289aa7652ee7e4fe88c9b0e
Parents: 00aa293
Author: GERey <gr...@apigee.com>
Authored: Fri May 22 14:37:56 2015 -0700
Committer: GERey <gr...@apigee.com>
Committed: Fri May 22 14:37:56 2015 -0700

----------------------------------------------------------------------
 .../corepersistence/index/IndexServiceImpl.java | 27 ++++------
 .../index/ApplicationEntityIndex.java           |  4 +-
 .../impl/EsApplicationEntityIndexImpl.java      | 56 +++++++++++++++-----
 3 files changed, 53 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa8b20d9/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
index 5e2a5ea..908bd3d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
@@ -187,32 +187,23 @@ public class IndexServiceImpl implements IndexService {
                 final IndexEdge fromSource = generateScopeFromSource( edge );
                 final Id targetId = edge.getTargetNode();
 
+                CandidateResults targetEdgesToBeDeindexed = ei.getAllEdgeDocuments( fromSource, targetId );
 
-                CandidateResults targetEdgesToBeDeindexed = ei.getAllEdgeDocuments( fromSource, targetId, 1000, 0 );
 
-                //Should loop thorugh and query for all documents and if there are no documents then the loop should
-                // exit.
-                do {
-                    batch = deindexBatchIteratorResolver( fromSource, targetEdgesToBeDeindexed, batch );
-                    if ( !targetEdgesToBeDeindexed.getOffset().isPresent() ) break;
-                    targetEdgesToBeDeindexed = ei.getAllEdgeDocuments( fromSource, targetId, 1000,
-                        targetEdgesToBeDeindexed.getOffset().get() );
-                }
-                while ( !targetEdgesToBeDeindexed.isEmpty() );
+                //1. Feed the observable the candidate results you got back. Since it now does the aggregation for you
+                // you don't need to worry about putting your code in a do while.
+
+
+                batch = deindexBatchIteratorResolver( fromSource, targetEdgesToBeDeindexed, batch );
+
 
 
                 final IndexEdge fromTarget = generateScopeFromTarget( edge );
                 final Id sourceId = edge.getSourceNode();
 
-                CandidateResults sourceEdgesToBeDeindexed = ei.getAllEdgeDocuments( fromTarget, sourceId, 1000, 0 );
+                CandidateResults sourceEdgesToBeDeindexed = ei.getAllEdgeDocuments( fromTarget, sourceId );
 
-                do {
-                    batch = deindexBatchIteratorResolver( fromTarget, sourceEdgesToBeDeindexed, batch );
-                    if ( !sourceEdgesToBeDeindexed.getOffset().isPresent() ) break;
-                    sourceEdgesToBeDeindexed = ei.getAllEdgeDocuments( fromTarget, sourceId, 1000,
-                        sourceEdgesToBeDeindexed.getOffset().get() );
-                }
-                while ( !sourceEdgesToBeDeindexed.isEmpty() );
+                batch = deindexBatchIteratorResolver( fromTarget, sourceEdgesToBeDeindexed, batch );
 
                 return batch.execute();
             } );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa8b20d9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java
index 9ce65e9..b392f3c 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java
@@ -54,11 +54,9 @@ public interface ApplicationEntityIndex {
      * Same as search, just iterates all documents that match the index edge exactly.
      * @param edge The edge to search on
      * @param entityId The entity that the searchEdge is connected to.
-     * @param limit The limit of the values to return per search.
-     * @param offset The offset to page the query on.
      * @return
      */
-    CandidateResults getAllEdgeDocuments(final IndexEdge edge, final Id entityId,  final int limit, final int offset);
+    CandidateResults getAllEdgeDocuments( final IndexEdge edge, final Id entityId );
 
     /**
      * Returns all entity documents that match the entityId and come before the marked version

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa8b20d9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
index 99e5525..347a7b9 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
@@ -185,8 +185,7 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex {
 
 
     @Override
-    public CandidateResults getAllEdgeDocuments( final IndexEdge edge, final Id entityId, final int limit,
-                                                 final int offset ) {
+    public CandidateResults getAllEdgeDocuments( final IndexEdge edge, final Id entityId ) {
         /**
          * Take a list of IndexEdge, with an entityId
          and query Es directly for matches
@@ -194,26 +193,59 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex {
          */
         IndexValidationUtils.validateSearchEdge( edge );
         Preconditions.checkNotNull( entityId, "entityId cannot be null" );
-        Preconditions.checkArgument( limit > 0, "limit must be > 0" );
 
         SearchResponse searchResponse;
 
+        List<CandidateResult> candidates = new ArrayList<>();
+
         final ParsedQuery parsedQuery = ParsedQueryBuilder.build( "select *" );
-        FilterBuilders.idsFilter( entityId.getType() );
 
-        final SearchRequestBuilder srb = searchRequest.getBuilder( edge, SearchTypes.fromTypes( entityId.getType() ),
-            parsedQuery, limit, offset ).setTimeout( TimeValue.timeValueMillis( queryTimeout ) );
+        final SearchRequestBuilder srb = searchRequestBuilderStrategyV2.getBuilder();
+
+        //I can't just search on the entity Id.
+
+        FilterBuilder entityEdgeFilter = FilterBuilders.termFilter( IndexingUtils.EDGE_NODE_ID_FIELDNAME,
+            IndexingUtils.idString( edge.getNodeId() ));
+
+        srb.setPostFilter(entityEdgeFilter);
 
         if ( logger.isDebugEnabled() ) {
-            logger.debug( "Searching for edge index (read alias): {}\n  nodeId: {}, edgeType: {},  \n type: {}\n   query: {} ",
-                this.alias.getReadAlias(), edge.getNodeId(), edge.getEdgeName(),
-                SearchTypes.fromTypes( entityId.getType() ), srb );
+            logger.debug( "Searching for marked versions in index (read alias): {}\n  nodeId: {},\n   query: {} ",
+                this.alias.getReadAlias(),entityId, srb );
         }
 
         try {
             //Added For Graphite Metrics
             Timer.Context timeSearch = searchTimer.time();
-            searchResponse = srb.execute().actionGet();
+
+            //set the timeout on the scroll cursor to 6 seconds and set the number of values returned per shard to 100.
+            //The settings for the scroll aren't tested and so we aren't sure what vlaues would be best in a production enviroment
+            //TODO: review this and make them not magic numbers when acking this PR.
+            searchResponse = srb.setScroll( new TimeValue( 6000 ) ).setSize( 100 ).execute().actionGet();
+            
+
+            while(true){
+                //add search result hits to some sort of running tally of hits.
+                candidates = aggregateScrollResults( candidates, searchResponse );
+
+                SearchScrollRequestBuilder ssrb = searchRequestBuilderStrategyV2
+                    .getScrollBuilder( searchResponse.getScrollId() )
+                    .setScroll( new TimeValue( 6000 ) );
+
+                //TODO: figure out how to log exactly what we're putting into elasticsearch
+                //                if ( logger.isDebugEnabled() ) {
+                //                    logger.debug( "Scroll search using query: {} ",
+                //                        ssrb.toString() );
+                //                }
+
+                searchResponse = ssrb.execute().actionGet();
+
+                if (searchResponse.getHits().getHits().length == 0) {
+                    break;
+                }
+
+
+            }
             timeSearch.stop();
         }
         catch ( Throwable t ) {
@@ -223,7 +255,7 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex {
         }
         failureMonitor.success();
 
-        return parseResults(searchResponse, parsedQuery, limit, offset);
+        return new CandidateResults( candidates, parsedQuery.getSelectFieldMappings());
     }
 
 
@@ -264,8 +296,6 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex {
             //Added For Graphite Metrics
             Timer.Context timeSearch = searchTimer.time();
 
-            //REfactor this out and have it return a modified parseResults that will create the candidateResults from
-            //the hit results and then keep that
             //set the timeout on the scroll cursor to 6 seconds and set the number of values returned per shard to 100.
             //The settings for the scroll aren't tested and so we aren't sure what vlaues would be best in a production enviroment
             //TODO: review this and make them not magic numbers when acking this PR.