You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2018/10/23 03:17:48 UTC

[10/16] usergrid git commit: Several indexing fixes:

Several indexing fixes:

1. remove possible duplicate entities from query results when using collection
   setting indexConsistency=latest
2. deindex all but latest candidate for an entity, even when using
   indexConsistency=latest
3. fix indexing bug when collection settings have been added but fields
   setting does not exist
4. by default deindex deleted entity even when indexing is off for the
   collection (this will clean up old index documents) - this is controlled by
   usergrid.entityManager.deindex_deleted_when_collection_indexing_off
   property.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/1feb3dba
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/1feb3dba
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/1feb3dba

Branch: refs/heads/master
Commit: 1feb3dbab88145c9d5483b05bced03c2f6bafa79
Parents: 35a22ed
Author: Mike Dunker <md...@google.com>
Authored: Thu Mar 8 00:32:39 2018 -0800
Committer: Keyur Karnik <ke...@gmail.com>
Committed: Tue Aug 28 16:41:44 2018 -0700

----------------------------------------------------------------------
 .../corepersistence/CpEntityManager.java        |   4 +-
 .../corepersistence/EntityManagerFig.java       |   4 +
 .../corepersistence/index/IndexServiceImpl.java |   5 +
 .../read/search/CandidateEntityFilter.java      | 109 +++++++++++++++----
 .../queue/settings/IndexConsistency.java        |   2 +-
 5 files changed, 99 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/1feb3dba/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 7a4c781..ed33201 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -638,7 +638,9 @@ public class CpEntityManager implements EntityManager {
 
         Id entityId = new SimpleId( entityRef.getUuid(), entityRef.getType() );
 
-        if ( !skipIndexingForType( entityId.getType() ) ) {
+        // may still want to delete index entries even if indexing is turned off for new updates
+        if ( entityManagerFig.deindexDeletedWhenCollectionIndexingOff() ||
+            !skipIndexingForType( entityId.getType() ) ) {
             indexService.queueEntityDelete( applicationScope, entityId );
         }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/1feb3dba/stack/core/src/main/java/org/apache/usergrid/corepersistence/EntityManagerFig.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/EntityManagerFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/EntityManagerFig.java
index 3c8a53f..23cf1c3 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/EntityManagerFig.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/EntityManagerFig.java
@@ -42,6 +42,10 @@ public interface EntityManagerFig extends GuicyFig {
     @Default( "false" )
     boolean getDeindexOnUpdate();
 
+    @Key( "usergrid.entityManager.deindex_deleted_when_collection_indexing_off")
+    @Default( "true" )
+    boolean deindexDeletedWhenCollectionIndexingOff();
+
     /**
      * Comma-separated list of one or more Amazon regions to use if multiregion
      * is set to true.

http://git-wip-us.apache.org/repos/asf/usergrid/blob/1feb3dba/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
index 32470f6..b1d493e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
@@ -199,6 +199,11 @@ public class IndexServiceImpl implements IndexService {
 
             Object fields = jsonMapData.get("fields");
 
+            // if "fields" field doesn't exist, should treat like fields=all
+            if ( fields == null ) {
+                return Optional.absent();
+            }
+
             if ( fields != null && fields instanceof String && "all".equalsIgnoreCase(fields.toString())) {
                 return Optional.absent();
             }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/1feb3dba/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
index 5dcbd27..f9c93b0 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
@@ -21,6 +21,7 @@ package org.apache.usergrid.corepersistence.pipeline.read.search;
 
 
 import java.util.*;
+import java.util.logging.Filter;
 
 import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
 import org.apache.usergrid.persistence.index.*;
@@ -234,6 +235,7 @@ public class CandidateEntityFilter extends AbstractFilter<FilterResult<Candidate
         private final List<FilterResult<Candidate>> candidateResults;
         private final IndexProducer indexProducer;
         private final EntitySet entitySet;
+        private List<FilterResult<Candidate>> dedupedCandidateResults = new ArrayList<>();
 
 
         public EntityVerifier( final EntityIndexBatch batch, final EntitySet entitySet,
@@ -252,7 +254,9 @@ public class CandidateEntityFilter extends AbstractFilter<FilterResult<Candidate
          */
         public void merge(boolean keepStaleEntries, String query) {
 
-            for ( final FilterResult<Candidate> candidateResult : candidateResults ) {
+            filterDuplicateCandidates(query);
+
+            for ( final FilterResult<Candidate> candidateResult : dedupedCandidateResults ) {
                 validate( candidateResult , keepStaleEntries, query);
             }
 
@@ -287,6 +291,69 @@ public class CandidateEntityFilter extends AbstractFilter<FilterResult<Candidate
         }
 
 
+        // don't need to worry about whether we are keeping stale entries -- this will remove candidates that are
+        // older than others in the result set
+        private void filterDuplicateCandidates(String query) {
+
+            Map<Id, UUID> latestEntityVersions = new HashMap<>();
+
+            // walk through candidates and find latest version for each entityID
+            for ( final FilterResult<Candidate> filterResult : candidateResults ) {
+                final Candidate candidate = filterResult.getValue();
+                final CandidateResult candidateResult = candidate.getCandidateResult();
+                final Id candidateId = candidateResult.getId();
+                final UUID candidateVersion = candidateResult.getVersion();
+
+                UUID previousCandidateVersion = latestEntityVersions.get(candidateId);
+                if (previousCandidateVersion != null) {
+                    // replace if newer
+                    if (UUIDComparator.staticCompare(candidateVersion, previousCandidateVersion) > 0) {
+                        latestEntityVersions.put(candidateId, candidateVersion);
+                    }
+                } else {
+                    latestEntityVersions.put(candidateId, candidateVersion);
+                }
+            }
+
+            // walk through candidates again, saving newest results and deindexing older
+            for ( final FilterResult<Candidate> filterResult : candidateResults ) {
+                final Candidate candidate = filterResult.getValue();
+                final CandidateResult candidateResult = candidate.getCandidateResult();
+                final Id candidateId = candidateResult.getId();
+                final UUID candidateVersion = candidateResult.getVersion();
+
+                final UUID latestCandidateVersion = latestEntityVersions.get(candidateId);
+
+                if (candidateVersion.equals(latestCandidateVersion)) {
+                    // save candidate
+                    dedupedCandidateResults.add(filterResult);
+                } else {
+                    // deindex if not the current version in database
+                    final MvccEntity entity = entitySet.getEntity( candidateId );
+                    final UUID databaseVersion = entity.getVersion();
+
+                    if (!candidateVersion.equals(databaseVersion)) {
+                        Date candidateTimeStamp = UUIDTimeStampToDate(candidateVersion);
+                        Date entityTimeStamp = UUIDTimeStampToDate(databaseVersion);
+
+                        logger.warn( "Found old stale entity on edge {} for entityId {} Entity version {} ({}).  Candidate version {} ({}). Will not be returned in result set. Query = [{}]",
+                            candidate.getSearchEdge(),
+                            entity.getId().getUuid(),
+                            databaseVersion,
+                            DateUtils.instance.formatIso8601Date(entityTimeStamp),
+                            candidateVersion,
+                            DateUtils.instance.formatIso8601Date(candidateTimeStamp),
+                            query
+                        );
+
+                        final SearchEdge searchEdge = candidate.getSearchEdge();
+                        batch.deindex(searchEdge, entity.getId(), candidateVersion);
+                    }
+                }
+            }
+        }
+
+
         private void validate( final FilterResult<Candidate> filterResult, boolean keepStaleEntries, String query ) {
 
             final Candidate candidate = filterResult.getValue();
@@ -314,7 +381,7 @@ public class CandidateEntityFilter extends AbstractFilter<FilterResult<Candidate
             }
 
 
-            final UUID entityVersion = entity.getVersion();
+            final UUID databaseVersion = entity.getVersion();
             final Id entityId = entity.getId();
 
             // The entity is marked as deleted
@@ -323,7 +390,7 @@ public class CandidateEntityFilter extends AbstractFilter<FilterResult<Candidate
                 // when updating entities, we don't delete all previous versions from ES so this action is expected
                 if(logger.isDebugEnabled()){
                     logger.debug( "Deindexing deleted entity on edge {} for entityId {} and version {}",
-                        searchEdge, entityId, entityVersion);
+                        searchEdge, entityId, databaseVersion);
                 }
 
                 batch.deindex( searchEdge, entityId, candidateVersion );
@@ -331,25 +398,21 @@ public class CandidateEntityFilter extends AbstractFilter<FilterResult<Candidate
             }
 
             // entity exists and is newer than ES version, could be a missed or slow index event
-            if ( UUIDComparator.staticCompare(entityVersion, candidateVersion) > 0 ) {
+            if ( UUIDComparator.staticCompare(databaseVersion, candidateVersion) > 0 ) {
 
                Date candidateTimeStamp = UUIDTimeStampToDate(candidateVersion);
-               Date entityTimeStamp = UUIDTimeStampToDate(entityVersion);
-
-               Map<String,String> fields = new HashMap<>();
-               for  (Field field : entity.getEntity().get().getFields()) {
-                   fields.put(field.getName(),String.valueOf(field.getValue()));
-               }
-
-               logger.warn( "Found stale entity on edge {} for entityId {} Entity version date = {}.  Candidate version date = {}. Will be returned in result set = {} Query = [{}] Entity fields = {}",
-                   searchEdge,
-                   entityId.getUuid(),
-                   DateUtils.instance.formatIso8601Date(entityTimeStamp),
-                   DateUtils.instance.formatIso8601Date(candidateTimeStamp),
-                   keepStaleEntries,
-                   query,
-                   fields
-               );
+               Date entityTimeStamp = UUIDTimeStampToDate(databaseVersion);
+
+                logger.warn( "Found stale entity on edge {} for entityId {} Entity version {} ({}).  Candidate version {} ({}). Will be returned in result set = {} Query = [{}]",
+                    candidate.getSearchEdge(),
+                    entity.getId().getUuid(),
+                    databaseVersion,
+                    DateUtils.instance.formatIso8601Date(entityTimeStamp),
+                    candidateVersion,
+                    DateUtils.instance.formatIso8601Date(candidateTimeStamp),
+                    keepStaleEntries,
+                    query
+                );
 
                 if (!keepStaleEntries) {
                     batch.deindex(searchEdge, entityId, candidateVersion);
@@ -359,18 +422,18 @@ public class CandidateEntityFilter extends AbstractFilter<FilterResult<Candidate
 
             //ES is newer than cass, it means we haven't repaired the record in Cass, we don't want to
             //remove the ES record, since the read in cass should cause a read repair, just ignore
-            if ( UUIDComparator.staticCompare( candidateVersion, entityVersion ) > 0 ) {
+            if ( UUIDComparator.staticCompare( candidateVersion, databaseVersion ) > 0 ) {
 
                 logger.warn(
                     "Found a newer version in ES over cassandra for edge {} for entityId {} and version {}.  Repair should be run",
-                        searchEdge, entityId, entityVersion);
+                        searchEdge, entityId, databaseVersion);
 
                   //TODO trigger an audit after a fail count where we explicitly try to repair from other regions
 
                 return;
             }
 
-            //they're the same add it
+            // add the result
 
             final Entity returnEntity = entity.getEntity().get();
             if(isGeo){

http://git-wip-us.apache.org/repos/asf/usergrid/blob/1feb3dba/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/settings/IndexConsistency.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/settings/IndexConsistency.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/settings/IndexConsistency.java
index 531716a..fdbd399 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/settings/IndexConsistency.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/settings/IndexConsistency.java
@@ -51,7 +51,7 @@ public enum IndexConsistency {
     public static IndexConsistency get(String name) {
         IndexConsistency queueIndexingStrategy =  NAME_MAP.get(name);
         if (queueIndexingStrategy == null) {
-            return LATEST;
+            return STRICT;
         }
         return queueIndexingStrategy;
     }