You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2016/07/14 05:38:13 UTC
[07/50] usergrid git commit: Enhance some logic around deleting of
edges.
Enhance some logic around deleting of edges.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/9c4b5242
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/9c4b5242
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/9c4b5242
Branch: refs/heads/master
Commit: 9c4b5242a169cb4e90922c3fc7755be3bdac8be8
Parents: 09280b1
Author: Michael Russo <mr...@apigee.com>
Authored: Mon Jun 27 21:13:45 2016 -0700
Committer: Michael Russo <mr...@apigee.com>
Committed: Mon Jun 27 21:13:45 2016 -0700
----------------------------------------------------------------------
.../corepersistence/CpRelationManager.java | 40 ++++++++---------
.../asyncevents/EventBuilderImpl.java | 7 ++-
.../index/impl/EsEntityIndexImpl.java | 47 +++++++-------------
3 files changed, 37 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9c4b5242/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index 39b3161..6e1bade 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -524,44 +524,42 @@ public class CpRelationManager implements RelationManager {
}
Id entityId = new SimpleId( itemRef.getUuid(), itemRef.getType() );
- org.apache.usergrid.persistence.model.entity.Entity memberEntity = ( ( CpEntityManager ) em ).load( entityId );
-
// remove edge from collection to item
GraphManager gm = managerCache.getGraphManager( applicationScope );
- List<Edge> removedEdges = new ArrayList<>();
- //run our delete
- gm.loadEdgeVersions(
- CpNamingUtils.createEdgeFromCollectionName( cpHeadEntity.getId(), collectionName, memberEntity.getId() ) )
- .flatMap(edge -> gm.markEdge(edge)).flatMap(edge -> gm.deleteEdge(edge))
- .doOnNext(edge -> removedEdges.add(edge)).toBlocking()
- .lastOrDefault(null);
+ // mark the edge versions and take the first for later delete edge queue event ( load is descending )
+ final Edge markedSourceEdge = gm.loadEdgeVersions(
+ CpNamingUtils.createEdgeFromCollectionName( cpHeadEntity.getId(), collectionName, entityId ) )
+ .flatMap(edge -> gm.markEdge(edge)).toBlocking().firstOrDefault(null);
+
+
+ Edge markedReversedEdge = null;
CollectionInfo collection = getDefaultSchema().getCollection( headEntity.getType(), collectionName );
if (collection != null && collection.getLinkedCollection() != null) {
// delete reverse edges
final String pluralType = InflectionUtils.pluralize( cpHeadEntity.getId().getType() );
- gm.loadEdgeVersions(
- CpNamingUtils.createEdgeFromCollectionName( memberEntity.getId(), pluralType, cpHeadEntity.getId() ) )
- .flatMap(reverseEdge -> gm.markEdge(reverseEdge))
- .flatMap(reverseEdge -> gm.deleteEdge(reverseEdge))
- .doOnNext(reverseEdge -> removedEdges.add(reverseEdge))
- .toBlocking().lastOrDefault(null);
+ markedReversedEdge = gm.loadEdgeVersions(
+ CpNamingUtils.createEdgeFromCollectionName( entityId, pluralType, cpHeadEntity.getId() ) )
+ .flatMap(reverseEdge -> gm.markEdge(reverseEdge)).toBlocking().firstOrDefault(null);
}
/**
- * Remove from the index
+ * Remove from the index. This will call gm.deleteEdge which also deletes the reverse edge(s) and de-indexes
+ * older versions of the edge(s).
*
*/
+ if( markedSourceEdge != null ) {
+ indexService.queueDeleteEdge(applicationScope, markedSourceEdge);
+ }
+ if( markedReversedEdge != null ){
+ indexService.queueDeleteEdge(applicationScope, markedReversedEdge);
+ }
- // item not deindexed, only edges
- removedEdges.forEach(edge -> {
- indexService.queueDeleteEdge(applicationScope, edge);
- });
// special handling for roles collection of a group
if ( headEntity.getType().equals( Group.ENTITY_TYPE ) ) {
@@ -572,7 +570,7 @@ public class CpRelationManager implements RelationManager {
if ( path.startsWith( "/roles/" ) ) {
Entity itemEntity =
- em.get( new SimpleEntityRef( memberEntity.getId().getType(), memberEntity.getId().getUuid() ) );
+ em.get( new SimpleEntityRef( entityId.getType(), entityId.getUuid() ) );
RoleRef roleRef = SimpleRoleRef.forRoleEntity( itemEntity );
em.deleteRole( roleRef.getApplicationRoleName(), Optional.fromNullable(itemEntity) );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9c4b5242/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
index 02a7588..bbdce5a 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
@@ -96,10 +96,9 @@ public class EventBuilderImpl implements EventBuilder {
logger.debug("Deleting in app scope {} with edge {}", applicationScope, edge);
}
- return indexService.deleteIndexEdge( applicationScope, edge ).flatMap( batch -> {
- final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );
- return gm.deleteEdge( edge ).map( deletedEdge -> batch );
- } );
+ final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );
+ return gm.deleteEdge( edge )
+ .flatMap( deletedEdge -> indexService.deleteIndexEdge( applicationScope, deletedEdge ));
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9c4b5242/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index 3b60b57..dc110f7 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -466,8 +466,8 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData {
SearchResponse searchResponse;
List<CandidateResult> candidates = new ArrayList<>();
- //never let the limit be less than 2 as there are potential indefinite paging issues
- final int searchLimit = Math.max(2, indexFig.getVersionQueryLimit());
+ // never let this fetch more than 100 to save memory
+ final int searchLimit = Math.min(100, indexFig.getVersionQueryLimit());
final QueryBuilder entityQuery = QueryBuilders
.termQuery(IndexingUtils.EDGE_NODE_ID_FIELDNAME, IndexingUtils.nodeId(edge.getNodeId()));
@@ -485,41 +485,24 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData {
long queryTimestamp = 0L;
- while(true){
-
- QueryBuilder timestampQuery = QueryBuilders
- .rangeQuery(IndexingUtils.EDGE_TIMESTAMP_FIELDNAME)
- .gte(queryTimestamp);
-
- QueryBuilder finalQuery = QueryBuilders
- .boolQuery()
- .must(entityQuery)
- .must(timestampQuery);
-
- searchResponse = srb
- .setQuery(finalQuery)
- .setSize(searchLimit)
- .execute()
- .actionGet();
-
- int responseSize = searchResponse.getHits().getHits().length;
- if(responseSize == 0){
- break;
- }
- // update queryTimestamp to be the timestamp of the last entity returned from the query
- queryTimestamp = (long) searchResponse
- .getHits().getAt(responseSize - 1)
- .getSource().get(IndexingUtils.EDGE_TIMESTAMP_FIELDNAME);
+ QueryBuilder timestampQuery = QueryBuilders
+ .rangeQuery(IndexingUtils.EDGE_TIMESTAMP_FIELDNAME)
+ .gte(queryTimestamp);
- candidates = aggregateScrollResults(candidates, searchResponse, null);
+ QueryBuilder finalQuery = QueryBuilders
+ .boolQuery()
+ .must(entityQuery)
+ .must(timestampQuery);
- if(responseSize < searchLimit){
+ searchResponse = srb
+ .setQuery(finalQuery)
+ .setSize(searchLimit)
+ .execute()
+ .actionGet();
- break;
- }
+ candidates = aggregateScrollResults(candidates, searchResponse, null);
- }
}
catch ( Throwable t ) {
logger.error( "Unable to communicate with Elasticsearch", t.getMessage() );