You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2016/07/14 05:38:13 UTC

[07/50] usergrid git commit: Enhance some logic around deleting of edges.

Enhance some logic around deleting of edges.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/9c4b5242
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/9c4b5242
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/9c4b5242

Branch: refs/heads/master
Commit: 9c4b5242a169cb4e90922c3fc7755be3bdac8be8
Parents: 09280b1
Author: Michael Russo <mr...@apigee.com>
Authored: Mon Jun 27 21:13:45 2016 -0700
Committer: Michael Russo <mr...@apigee.com>
Committed: Mon Jun 27 21:13:45 2016 -0700

----------------------------------------------------------------------
 .../corepersistence/CpRelationManager.java      | 40 ++++++++---------
 .../asyncevents/EventBuilderImpl.java           |  7 ++-
 .../index/impl/EsEntityIndexImpl.java           | 47 +++++++-------------
 3 files changed, 37 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/9c4b5242/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index 39b3161..6e1bade 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -524,44 +524,42 @@ public class CpRelationManager implements RelationManager {
         }
 
         Id entityId = new SimpleId( itemRef.getUuid(), itemRef.getType() );
-        org.apache.usergrid.persistence.model.entity.Entity memberEntity = ( ( CpEntityManager ) em ).load( entityId );
-
 
         // remove edge from collection to item
         GraphManager gm = managerCache.getGraphManager( applicationScope );
 
 
-        List<Edge> removedEdges = new ArrayList<>();
-        //run our delete
-        gm.loadEdgeVersions(
-            CpNamingUtils.createEdgeFromCollectionName( cpHeadEntity.getId(), collectionName, memberEntity.getId() ) )
-                .flatMap(edge -> gm.markEdge(edge)).flatMap(edge -> gm.deleteEdge(edge))
-                .doOnNext(edge -> removedEdges.add(edge)).toBlocking()
-          .lastOrDefault(null);
 
+        // mark the edge versions and take the first for later delete edge queue event ( load is descending )
+        final Edge markedSourceEdge = gm.loadEdgeVersions(
+            CpNamingUtils.createEdgeFromCollectionName( cpHeadEntity.getId(), collectionName, entityId ) )
+                .flatMap(edge -> gm.markEdge(edge)).toBlocking().firstOrDefault(null);
+
+
+        Edge markedReversedEdge = null;
         CollectionInfo collection = getDefaultSchema().getCollection( headEntity.getType(), collectionName );
         if (collection != null && collection.getLinkedCollection() != null) {
             // delete reverse edges
             final String pluralType = InflectionUtils.pluralize( cpHeadEntity.getId().getType() );
-            gm.loadEdgeVersions(
-                    CpNamingUtils.createEdgeFromCollectionName( memberEntity.getId(), pluralType, cpHeadEntity.getId() ) )
-                    .flatMap(reverseEdge -> gm.markEdge(reverseEdge))
-                    .flatMap(reverseEdge -> gm.deleteEdge(reverseEdge))
-                    .doOnNext(reverseEdge -> removedEdges.add(reverseEdge))
-                    .toBlocking().lastOrDefault(null);
+            markedReversedEdge = gm.loadEdgeVersions(
+                    CpNamingUtils.createEdgeFromCollectionName( entityId, pluralType, cpHeadEntity.getId() ) )
+                    .flatMap(reverseEdge -> gm.markEdge(reverseEdge)).toBlocking().firstOrDefault(null);
         }
 
 
         /**
-         * Remove from the index
+         * Remove from the index.  This will call gm.deleteEdge which also deletes the reverse edge(s) and de-indexes
+         * older versions of the edge(s).
          *
          */
+        if( markedSourceEdge != null ) {
+            indexService.queueDeleteEdge(applicationScope, markedSourceEdge);
+        }
+        if( markedReversedEdge != null ){
+            indexService.queueDeleteEdge(applicationScope, markedReversedEdge);
 
+        }
 
-        // item not deindexed, only edges
-        removedEdges.forEach(edge -> {
-            indexService.queueDeleteEdge(applicationScope, edge);
-        });
 
         // special handling for roles collection of a group
         if ( headEntity.getType().equals( Group.ENTITY_TYPE ) ) {
@@ -572,7 +570,7 @@ public class CpRelationManager implements RelationManager {
                 if ( path.startsWith( "/roles/" ) ) {
 
                     Entity itemEntity =
-                        em.get( new SimpleEntityRef( memberEntity.getId().getType(), memberEntity.getId().getUuid() ) );
+                        em.get( new SimpleEntityRef( entityId.getType(), entityId.getUuid() ) );
 
                     RoleRef roleRef = SimpleRoleRef.forRoleEntity( itemEntity );
                     em.deleteRole( roleRef.getApplicationRoleName(), Optional.fromNullable(itemEntity) );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9c4b5242/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
index 02a7588..bbdce5a 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
@@ -96,10 +96,9 @@ public class EventBuilderImpl implements EventBuilder {
             logger.debug("Deleting in app scope {} with edge {}", applicationScope, edge);
         }
 
-        return indexService.deleteIndexEdge( applicationScope, edge ).flatMap( batch -> {
-            final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );
-            return gm.deleteEdge( edge ).map( deletedEdge -> batch );
-        } );
+        final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );
+        return gm.deleteEdge( edge )
+            .flatMap( deletedEdge -> indexService.deleteIndexEdge( applicationScope, deletedEdge ));
     }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9c4b5242/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index 3b60b57..dc110f7 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -466,8 +466,8 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData {
         SearchResponse searchResponse;
         List<CandidateResult> candidates = new ArrayList<>();
 
-        //never let the limit be less than 2 as there are potential indefinite paging issues
-        final int searchLimit = Math.max(2, indexFig.getVersionQueryLimit());
+        // never let this fetch more than 100 to save memory
+        final int searchLimit = Math.min(100, indexFig.getVersionQueryLimit());
 
         final QueryBuilder entityQuery = QueryBuilders
             .termQuery(IndexingUtils.EDGE_NODE_ID_FIELDNAME, IndexingUtils.nodeId(edge.getNodeId()));
@@ -485,41 +485,24 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData {
 
             long queryTimestamp = 0L;
 
-            while(true){
-
-                QueryBuilder timestampQuery =  QueryBuilders
-                    .rangeQuery(IndexingUtils.EDGE_TIMESTAMP_FIELDNAME)
-                    .gte(queryTimestamp);
-
-                QueryBuilder finalQuery = QueryBuilders
-                    .boolQuery()
-                    .must(entityQuery)
-                    .must(timestampQuery);
-
-                searchResponse = srb
-                    .setQuery(finalQuery)
-                    .setSize(searchLimit)
-                    .execute()
-                    .actionGet();
-
-                int responseSize = searchResponse.getHits().getHits().length;
-                if(responseSize == 0){
-                    break;
-                }
 
-                // update queryTimestamp to be the timestamp of the last entity returned from the query
-                queryTimestamp = (long) searchResponse
-                    .getHits().getAt(responseSize - 1)
-                    .getSource().get(IndexingUtils.EDGE_TIMESTAMP_FIELDNAME);
+            QueryBuilder timestampQuery =  QueryBuilders
+                .rangeQuery(IndexingUtils.EDGE_TIMESTAMP_FIELDNAME)
+                .gte(queryTimestamp);
 
-                candidates = aggregateScrollResults(candidates, searchResponse, null);
+            QueryBuilder finalQuery = QueryBuilders
+                .boolQuery()
+                .must(entityQuery)
+                .must(timestampQuery);
 
-                if(responseSize < searchLimit){
+            searchResponse = srb
+                .setQuery(finalQuery)
+                .setSize(searchLimit)
+                .execute()
+                .actionGet();
 
-                    break;
-                }
+            candidates = aggregateScrollResults(candidates, searchResponse, null);
 
-            }
         }
         catch ( Throwable t ) {
             logger.error( "Unable to communicate with Elasticsearch", t.getMessage() );