You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2015/11/06 21:38:14 UTC

[20/50] [abbrv] usergrid git commit: -Convert search in ES to use term queries instead of scroll with filters. -Fix delete so we remove connections from Elasticsearch as well as the entity on entity delete event.

-Convert search in ES to use term queries instead of scroll with filters.
-Fix delete so we remove connections from Elasticsearch as well as the entity on entity delete event.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/22beca2c
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/22beca2c
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/22beca2c

Branch: refs/heads/asf-site
Commit: 22beca2cb3714833ddf6a748522c6bead17fe9f2
Parents: 2686054
Author: Michael Russo <mi...@gmail.com>
Authored: Wed Oct 28 15:58:41 2015 -0700
Committer: Michael Russo <mi...@gmail.com>
Committed: Wed Oct 28 15:58:41 2015 -0700

----------------------------------------------------------------------
 .../corepersistence/index/IndexServiceImpl.java |   2 +-
 .../persistence/index/CandidateResult.java      |  11 +-
 .../persistence/index/EntityIndexBatch.java     |   2 +-
 .../usergrid/persistence/index/IndexFig.java    |   6 +
 .../index/impl/DeIndexOperation.java            |   5 +
 .../index/impl/EsEntityIndexBatchImpl.java      |  34 ++++-
 .../index/impl/EsEntityIndexImpl.java           | 149 +++++++++++--------
 .../persistence/index/impl/IndexingUtils.java   |   2 +-
 8 files changed, 137 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/22beca2c/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
index ccb6221..b2a1a2a 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
@@ -219,7 +219,7 @@ public class IndexServiceImpl implements IndexService {
                 //collect results into a single batch
                 .collect( () -> ei.createBatch(), ( batch, candidateResult ) -> {
                     logger.debug( "Deindexing on edge {} for entity {} added to batch",searchEdge , entityId );
-                    batch.deindex( searchEdge, candidateResult );
+                    batch.deindex( candidateResult );
                 } )
                     //return the future from the batch execution
                 .flatMap( batch ->Observable.just(batch.build()) );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/22beca2c/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/CandidateResult.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/CandidateResult.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/CandidateResult.java
index 2375509..9b1898c 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/CandidateResult.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/CandidateResult.java
@@ -32,10 +32,12 @@ import org.apache.usergrid.persistence.model.entity.Id;
 public class CandidateResult implements EntityVersion {
     private final Id entityId;
     private final UUID entityVersion;
+    private final String docId;
 
-    public CandidateResult( Id entityId, UUID entityVersion ) {
+    public CandidateResult( Id entityId, UUID entityVersion, String docId ) {
         this.entityId = entityId;
         this.entityVersion = entityVersion;
+        this.docId = docId;
     }
 
     @Override
@@ -48,6 +50,10 @@ public class CandidateResult implements EntityVersion {
         return entityId;
     }
 
+    public String getDocId() {
+        return docId;
+    }
+
 
     @Override
     public boolean equals( final Object o ) {
@@ -66,6 +72,9 @@ public class CandidateResult implements EntityVersion {
         if ( !entityVersion.equals( that.entityVersion ) ) {
             return false;
         }
+        if ( !docId.equals( that.docId ) ) {
+            return false;
+        }
 
         return true;
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/22beca2c/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
index 98652c1..17dd4d3 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
@@ -63,7 +63,7 @@ public interface EntityIndexBatch {
     EntityIndexBatch deindex( final SearchEdge searchEdge, final Id id, final UUID version );
 
 
-
+    EntityIndexBatch deindex( final CandidateResult candidateResult);
 
     /**
      * get the batches

http://git-wip-us.apache.org/repos/asf/usergrid/blob/22beca2c/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
index e093d7d..33f199a 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
@@ -60,6 +60,8 @@ public interface IndexFig extends GuicyFig {
 
     String ELASTICSEARCH_CLIENT_TYPE = "elasticsearch.client.type";
 
+    String ELASTICSEARCH_VERSION_QUERY_LIMIT = "elasticsearch.version_query_limit";
+
 
     /**
      * Comma-separated list of Elasticsearch hosts.
@@ -199,4 +201,8 @@ public interface IndexFig extends GuicyFig {
     @Default("1000")
     @Key( "elasticsearch_queue_error_sleep_ms" )
     long getSleepTimeForQueueError();
+
+    @Default("1000")
+    @Key( ELASTICSEARCH_VERSION_QUERY_LIMIT )
+    int getVersionQueryLimit();
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/22beca2c/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexOperation.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexOperation.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexOperation.java
index 4060dac..dbecf8a 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexOperation.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexOperation.java
@@ -61,6 +61,11 @@ public class DeIndexOperation implements BatchOperation {
         this.documentId = createIndexDocId( applicationScope, id, version, searchEdge );
     }
 
+    public DeIndexOperation( String[] indexes, String docId) {
+        this.indexes = indexes;
+        this.documentId = docId;
+    }
+
 
     @Override
     public void doOperation( final Client client, final BulkRequestBuilder bulkRequest ) {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/22beca2c/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index 68830ca..a8fb751 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@ -84,23 +84,42 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
     @Override
     public EntityIndexBatch deindex( final SearchEdge searchEdge, final Id id, final UUID version ) {
 
-        IndexValidationUtils.validateSearchEdge( searchEdge );
-        ValidationUtils.verifyIdentity( id );
+        IndexValidationUtils.validateSearchEdge(searchEdge);
+        ValidationUtils.verifyIdentity(id);
         ValidationUtils.verifyVersion( version );
 
         String[] indexes = entityIndex.getIndexes();
         //get the default index if no alias exists yet
         if ( indexes == null || indexes.length == 0 ) {
-           throw new IllegalStateException("No indexes exist for " + indexLocationStrategy.getAlias().getWriteAlias());
+            throw new IllegalStateException("No indexes exist for " + indexLocationStrategy.getAlias().getWriteAlias());
         }
 
         if ( log.isDebugEnabled() ) {
             log.debug( "Deindexing to indexes {} with scope {} on edge {} with id {} and version {} ",
-                    new Object[] { indexes, applicationScope, searchEdge, id, version } );
+                new Object[] { indexes, applicationScope, searchEdge, id, version } );
         }
 
 
-        container.addDeIndexRequest( new DeIndexOperation( indexes, applicationScope, searchEdge, id, version ) );
+        container.addDeIndexRequest(new DeIndexOperation(indexes, applicationScope, searchEdge, id, version));
+
+        return this;
+    }
+
+    public EntityIndexBatch deindexWithDocId( final String docId ) {
+
+        String[] indexes = entityIndex.getIndexes();
+        //get the default index if no alias exists yet
+        if ( indexes == null || indexes.length == 0 ) {
+            throw new IllegalStateException("No indexes exist for " + indexLocationStrategy.getAlias().getWriteAlias());
+        }
+
+        if ( log.isDebugEnabled() ) {
+            log.debug( "Deindexing to indexes {} with with documentId {} ",
+                new Object[] { indexes, docId } );
+        }
+
+
+        container.addDeIndexRequest( new DeIndexOperation( indexes, docId ) );
 
         return this;
     }
@@ -117,6 +136,11 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
 
         return deindex( searchEdge, entity.getId(), entity.getVersion() );
     }
+    @Override
+    public EntityIndexBatch deindex( final CandidateResult entity ) {
+
+        return deindexWithDocId(entity.getDocId());
+    }
 
 
     @Override

http://git-wip-us.apache.org/repos/asf/usergrid/blob/22beca2c/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index 9be591e..0b7f2d7 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -65,6 +65,7 @@ import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.search.SearchHits;
 import org.elasticsearch.search.aggregations.metrics.sum.Sum;
 import org.elasticsearch.search.aggregations.metrics.sum.SumBuilder;
+import org.elasticsearch.search.sort.SortOrder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -389,7 +390,7 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData {
         final ParsedQuery parsedQuery = ParsedQueryBuilder.build(query);
 
         final SearchRequestBuilder srb = searchRequest.getBuilder( searchEdge, searchTypes, parsedQuery, limit, offset )
-            .setTimeout( TimeValue.timeValueMillis( queryTimeout ) );
+            .setTimeout(TimeValue.timeValueMillis(queryTimeout));
 
         if ( logger.isDebugEnabled() ) {
             logger.debug( "Searching index (read alias): {}\n  nodeId: {}, edgeType: {},  \n type: {}\n   query: {} ",
@@ -427,57 +428,62 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData {
 
          */
         IndexValidationUtils.validateSearchEdge(edge);
-        Preconditions.checkNotNull( entityId, "entityId cannot be null" );
+        Preconditions.checkNotNull(entityId, "entityId cannot be null");
 
         SearchResponse searchResponse;
-
         List<CandidateResult> candidates = new ArrayList<>();
 
-        final ParsedQuery parsedQuery = ParsedQueryBuilder.build( "select *" );
-
-        final SearchRequestBuilder srb = searchRequestBuilderStrategyV2.getBuilder();
-
-        //I can't just search on the entity Id.
+        //never let the limit be less than 2 as there are potential indefinite paging issues
+        final int searchLimit = Math.max(2, indexFig.getVersionQueryLimit());
 
-        FilterBuilder entityEdgeFilter = FilterBuilders.termFilter(IndexingUtils.EDGE_NODE_ID_FIELDNAME,
-            IndexingUtils.nodeId(edge.getNodeId()));
+        final QueryBuilder entityQuery = QueryBuilders
+            .termQuery(IndexingUtils.EDGE_NODE_ID_FIELDNAME, IndexingUtils.nodeId(edge.getNodeId()));
 
-        srb.setPostFilter(entityEdgeFilter);
+        final SearchRequestBuilder srb = searchRequestBuilderStrategyV2.getBuilder();
 
         if ( logger.isDebugEnabled() ) {
-            logger.debug( "Searching for marked versions in index (read alias): {}\n  nodeId: {},\n   query: {} ",
+            logger.debug( "Searching for edges in (read alias): {}\n  nodeId: {},\n   query: {} ",
                 this.alias.getReadAlias(),entityId, srb );
         }
 
         try {
-            //Added For Graphite Metrics
-
-            //set the timeout on the scroll cursor to 6 seconds and set the number of values returned per shard to 100.
-            //The settings for the scroll aren't tested and so we aren't sure what vlaues would be best in a production enviroment
-            //TODO: review this and make them not magic numbers when acking this PR.
-            searchResponse = srb.setScroll( new TimeValue( 6000 ) ).setSize( 100 ).execute().actionGet();
 
+            long queryTimestamp = 0L;
 
             while(true){
-                //add search result hits to some sort of running tally of hits.
-                candidates = aggregateScrollResults( candidates, searchResponse, null );
 
-                SearchScrollRequestBuilder ssrb = searchRequestBuilderStrategyV2
-                    .getScrollBuilder( searchResponse.getScrollId() )
-                    .setScroll( new TimeValue( 6000 ) );
+                QueryBuilder timestampQuery =  QueryBuilders
+                    .rangeQuery(IndexingUtils.EDGE_TIMESTAMP_FIELDNAME)
+                    .gte(queryTimestamp);
 
-                //TODO: figure out how to log exactly what we're putting into elasticsearch
-                //                if ( logger.isDebugEnabled() ) {
-                //                    logger.debug( "Scroll search using query: {} ",
-                //                        ssrb.toString() );
-                //                }
+                QueryBuilder finalQuery = QueryBuilders
+                    .boolQuery()
+                    .must(entityQuery)
+                    .must(timestampQuery);
 
-                searchResponse = ssrb.execute().actionGet();
+                searchResponse = srb
+                    .setQuery(finalQuery)
+                    .setSize(searchLimit)
+                    .addSort(IndexingUtils.EDGE_TIMESTAMP_FIELDNAME, SortOrder.ASC)
+                    .execute()
+                    .actionGet();
 
-                if (searchResponse.getHits().getHits().length == 0) {
+                int responseSize = searchResponse.getHits().getHits().length;
+                if(responseSize == 0){
                     break;
                 }
 
+                // update queryTimestamp to be the timestamp of the last entity returned from the query
+                queryTimestamp = (long) searchResponse
+                    .getHits().getAt(responseSize - 1)
+                    .getSource().get(IndexingUtils.EDGE_TIMESTAMP_FIELDNAME);
+
+                candidates = aggregateScrollResults(candidates, searchResponse, null);
+
+                if(responseSize < searchLimit){
+
+                    break;
+                }
 
             }
         }
@@ -488,7 +494,7 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData {
         }
         failureMonitor.success();
 
-        return new CandidateResults( candidates, parsedQuery.getSelectFieldMappings());
+        return new CandidateResults( candidates, Collections.EMPTY_SET);
     }
 
 
@@ -496,61 +502,74 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData {
     public CandidateResults getAllEntityVersionsBeforeMarkedVersion( final Id entityId, final UUID markedVersion ) {
 
         Preconditions.checkNotNull( entityId, "entityId cannot be null" );
-        Preconditions.checkNotNull( markedVersion, "markedVersion cannot be null" );
+        Preconditions.checkNotNull(markedVersion, "markedVersion cannot be null");
         ValidationUtils.verifyVersion(markedVersion);
 
         SearchResponse searchResponse;
-
         List<CandidateResult> candidates = new ArrayList<>();
 
-        final ParsedQuery parsedQuery = ParsedQueryBuilder.build( "select *" );
+        final long markedTimestamp = markedVersion.timestamp();
 
-        final SearchRequestBuilder srb = searchRequestBuilderStrategyV2.getBuilder();
-
-        FilterBuilder entityIdFilter = FilterBuilders.termFilter(IndexingUtils.ENTITY_ID_FIELDNAME,
-            IndexingUtils.entityId(entityId));
+        // never let the limit be less than 2 as there are potential indefinite paging issues
+        final int searchLimit = Math.max(2, indexFig.getVersionQueryLimit());
 
-        srb.setPostFilter(entityIdFilter);
+        // this query will find the document for the entity itself
+        final QueryBuilder entityQuery = QueryBuilders
+            .termQuery(IndexingUtils.ENTITY_ID_FIELDNAME, IndexingUtils.entityId(entityId));
 
+        // this query will find all the documents where this entity is a source/target node
+        final QueryBuilder nodeQuery = QueryBuilders
+            .termQuery(IndexingUtils.EDGE_NODE_ID_FIELDNAME, IndexingUtils.nodeId(entityId));
 
-
-        if ( logger.isDebugEnabled() ) {
-            logger.debug( "Searching for marked versions in index (read alias): {}\n  nodeId: {},\n   query: {} ",
-                this.alias.getReadAlias(),entityId, srb );
-        }
+        final SearchRequestBuilder srb = searchRequestBuilderStrategyV2.getBuilder();
 
         try {
-            //Added For Graphite Metrics
 
-            //set the timeout on the scroll cursor to 6 seconds and set the number of values returned per shard to 100.
-            //The settings for the scroll aren't tested and so we aren't sure what vlaues would be best in a production enviroment
-            //TODO: review this and make them not magic numbers when acking this PR.
-            searchResponse = srb.setScroll( new TimeValue( 6000 ) ).setSize( 100 ).execute().actionGet();
+            long queryTimestamp = 0L;
 
-            //list that will hold all of the search hits
+            while(true){
 
+                QueryBuilder timestampQuery =  QueryBuilders
+                    .rangeQuery(IndexingUtils.EDGE_TIMESTAMP_FIELDNAME)
+                    .gte(queryTimestamp)
+                    .lte(markedTimestamp);
+
+                QueryBuilder entityQueryWithTimestamp = QueryBuilders
+                    .boolQuery()
+                    .must(entityQuery)
+                    .must(timestampQuery);
+
+                QueryBuilder finalQuery = QueryBuilders
+                    .boolQuery()
+                    .should(entityQueryWithTimestamp)
+                    .should(nodeQuery)
+                    .minimumNumberShouldMatch(1);
+
+                searchResponse = srb
+                    .setQuery(finalQuery)
+                    .setSize(searchLimit)
+                    .addSort(IndexingUtils.EDGE_TIMESTAMP_FIELDNAME, SortOrder.ASC)
+                    .execute()
+                    .actionGet();
 
-            while(true){
-                //add search result hits to some sort of running tally of hits.
-                candidates = aggregateScrollResults( candidates, searchResponse, markedVersion);
+                int responseSize = searchResponse.getHits().getHits().length;
+                if(responseSize == 0){
+                    break;
+                }
+
+                candidates = aggregateScrollResults(candidates, searchResponse, markedVersion);
 
-                SearchScrollRequestBuilder ssrb = searchRequestBuilderStrategyV2
-                    .getScrollBuilder( searchResponse.getScrollId() )
-                    .setScroll( new TimeValue( 6000 ) );
+                // update queryTimestamp to be the timestamp of the last entity returned from the query
+                queryTimestamp = (long) searchResponse
+                    .getHits().getAt(responseSize - 1)
+                    .getSource().get(IndexingUtils.EDGE_TIMESTAMP_FIELDNAME);
 
-                //TODO: figure out how to log exactly what we're putting into elasticsearch
-//                if ( logger.isDebugEnabled() ) {
-//                    logger.debug( "Scroll search using query: {} ",
-//                        ssrb.toString() );
-//                }
 
-                searchResponse = ssrb.execute().actionGet();
+                if(responseSize < searchLimit){
 
-                if (searchResponse.getHits().getHits().length == 0) {
                     break;
                 }
 
-
             }
         }
         catch ( Throwable t ) {
@@ -560,7 +579,7 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData {
         }
         failureMonitor.success();
 
-        return new CandidateResults( candidates, parsedQuery.getSelectFieldMappings());
+        return new CandidateResults( candidates, Collections.EMPTY_SET);
     }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/22beca2c/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java
index 9e06fa6..18cb928 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java
@@ -241,7 +241,7 @@ public class IndexingUtils {
 
         Id entityId = new SimpleId( UUID.fromString( entityUUID ), entityType );
 
-        return new CandidateResult( entityId, UUID.fromString( versionUUID ) );
+        return new CandidateResult( entityId, UUID.fromString( versionUUID ), documentId );
     }