You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/06/05 00:47:36 UTC
[3/4] incubator-usergrid git commit: Fixes issue with ignored
compaction operation on edge delete
Fixes issue with ignored compaction operation on edge delete
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/c582d278
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/c582d278
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/c582d278
Branch: refs/heads/two-dot-o-dev
Commit: c582d2782fab7d035243cd09b80052588623eb38
Parents: dfda3cf
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Jun 2 15:17:07 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Jun 2 15:17:07 2015 -0600
----------------------------------------------------------------------
.../corepersistence/CpRelationManager.java | 18 +--
.../asyncevents/EventBuilderImpl.java | 4 +-
.../impl/stage/NodeDeleteListenerImpl.java | 113 ++++++++-----------
3 files changed, 52 insertions(+), 83 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c582d278/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index bc3c803..c717c5b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -414,7 +414,7 @@ public class CpRelationManager implements RelationManager {
});
}
})
- .toBlocking().lastOrDefault(null);
+ .toBlocking().lastOrDefault( null );
//check if we need to reverse our edges
@@ -622,7 +622,7 @@ public class CpRelationManager implements RelationManager {
final IdBuilder pipelineBuilder =
pipelineBuilderFactory.create( applicationScope ).withCursor( query.getCursor() )
- .withLimit( query.getLimit() ).fromId(cpHeadEntity.getId());
+ .withLimit( query.getLimit() ).fromId( cpHeadEntity.getId() );
final EntityBuilder results;
@@ -791,18 +791,10 @@ public class CpRelationManager implements RelationManager {
final SearchByEdge search = createConnectionSearchByEdge( sourceId, connectionType, targetEntity.getId() );
- //delete all the edges
- final Edge lastEdge =
- gm.loadEdgeVersions( search ).flatMap( returnedEdge -> gm.markEdge( returnedEdge ) ).toBlocking()
+ //delete all the edges and queue their processing
+ gm.loadEdgeVersions( search ).flatMap( returnedEdge -> gm.markEdge( returnedEdge ) ).doOnNext(
+ returnedEdge -> indexService.queueDeleteEdge( applicationScope, returnedEdge ) ).toBlocking()
.lastOrDefault( null );
-
- if ( lastEdge != null ) {
- final ApplicationEntityIndex ei = managerCache.getEntityIndex( applicationScope );
- final EntityIndexBatch batch = ei.createBatch();
-
- SearchEdge indexScope = createSearchEdgeFromSource( lastEdge );
- batch.deindex( indexScope, targetEntity );
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c582d278/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
index 57f30fd..e4cd4b5 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
@@ -108,9 +108,9 @@ public class EventBuilderImpl implements EventBuilder {
log.debug( "Deleting in app scope {} with edge {} }", applicationScope, edge );
final Observable<IndexOperationMessage> edgeObservable =
- indexService.deleteIndexEdge( applicationScope, edge ).doOnCompleted( () -> {
+ indexService.deleteIndexEdge( applicationScope, edge ).flatMap( batch -> {
final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );
- gm.deleteEdge( edge );
+ return gm.deleteEdge( edge ).map( deletedEdge -> batch );
} );
return edgeObservable;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c582d278/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
index 10ee0f6..e4eb5fc 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
@@ -158,112 +158,89 @@ public class NodeDeleteListenerImpl implements NodeDeleteListener {
//get all edges pointing to the target node and buffer then into groups for deletion
Observable<MarkedEdge> targetEdges =
getEdgesTypesToTarget( scope, new SimpleSearchEdgeType( node, null, null ) )
- .subscribeOn( Schedulers.io() ).flatMap( new Func1<String, Observable<MarkedEdge>>() {
- @Override
- public Observable<MarkedEdge> call( final String edgeType ) {
- return Observable.create( new ObservableIterator<MarkedEdge>( "getTargetEdges" ) {
+ .subscribeOn( Schedulers.io() ).flatMap( edgeType -> Observable.create( new ObservableIterator<MarkedEdge>( "getTargetEdges" ) {
@Override
protected Iterator<MarkedEdge> getIterator() {
return storageSerialization.getEdgesToTarget( scope,
new SimpleSearchByEdgeType( node, edgeType, maxVersion, SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent() ) );
}
- } );
- }
- } );
+ } ) );
//get all edges pointing to the source node and buffer them into groups for deletion
Observable<MarkedEdge> sourceEdges =
getEdgesTypesFromSource( scope, new SimpleSearchEdgeType( node, null, null ) )
- .subscribeOn( Schedulers.io() ).flatMap( new Func1<String, Observable<MarkedEdge>>() {
- @Override
- public Observable<MarkedEdge> call( final String edgeType ) {
- return Observable.create( new ObservableIterator<MarkedEdge>( "getSourceEdges" ) {
+ .subscribeOn( Schedulers.io() ).flatMap( edgeType -> Observable.create( new ObservableIterator<MarkedEdge>( "getSourceEdges" ) {
@Override
protected Iterator<MarkedEdge> getIterator() {
return storageSerialization.getEdgesFromSource( scope,
new SimpleSearchByEdgeType( node, edgeType, maxVersion, SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent() ) );
}
- } );
- }
- } );
+ } ) );
//merge both source and target into 1 observable. We'll need to check them all regardless of order
return Observable.merge( targetEdges, sourceEdges )
//buffer and delete marked edges in our buffer size so we're making less trips to cassandra
- .buffer( graphFig.getScanPageSize() ).flatMap( new Func1<List<MarkedEdge>, Observable<MarkedEdge>>() {
- @Override
- public Observable<MarkedEdge> call( final List<MarkedEdge> markedEdges ) {
+ .buffer( graphFig.getScanPageSize() ).flatMap( markedEdges -> {
- LOG.debug( "Batching {} edges for node {} for deletion", markedEdges.size(), node );
+ LOG.debug( "Batching {} edges for node {} for deletion", markedEdges.size(), node );
- final MutationBatch batch = keyspace.prepareMutationBatch();
+ final MutationBatch batch = keyspace.prepareMutationBatch();
- Set<TargetPair> sourceNodes = new HashSet<>( markedEdges.size() );
- Set<TargetPair> targetNodes = new HashSet<>( markedEdges.size() );
+ Set<TargetPair> sourceNodes = new HashSet<>( markedEdges.size() );
+ Set<TargetPair> targetNodes = new HashSet<>( markedEdges.size() );
- for ( MarkedEdge edge : markedEdges ) {
+ for ( MarkedEdge edge : markedEdges ) {
- //delete the newest edge <= the version on the node delete
+ //delete the newest edge <= the version on the node delete
- //we use the version specified on the delete purposefully. If these edges are re-written
- //at a greater time we want them to exit
- batch.mergeShallow( storageSerialization.deleteEdge( scope, edge, eventTimestamp ) );
+ //we use the version specified on the delete purposefully. If these edges are re-written
+ //at a greater time we want them to exit
+ batch.mergeShallow( storageSerialization.deleteEdge( scope, edge, eventTimestamp ) );
- sourceNodes.add( new TargetPair( edge.getSourceNode(), edge.getType() ) );
- targetNodes.add( new TargetPair( edge.getTargetNode(), edge.getType() ) );
- }
+ sourceNodes.add( new TargetPair( edge.getSourceNode(), edge.getType() ) );
+ targetNodes.add( new TargetPair( edge.getTargetNode(), edge.getType() ) );
+ }
- try {
- batch.execute();
- }
- catch ( ConnectionException e ) {
- throw new RuntimeException( "Unable to connect to casandra", e );
- }
+ try {
+ batch.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to connect to casandra", e );
+ }
- //now delete meta data
+ //now delete meta data
- //delete both the source and target meta data in parallel for the edge we deleted in the
- // previous step
- //if nothing else is using them. We purposefully do not schedule them on a new scheduler
- //we want them running on the i/o thread from the Observable emitting all the edges
+ //delete both the source and target meta data in parallel for the edge we deleted in the
+ // previous step
+ //if nothing else is using them. We purposefully do not schedule them on a new scheduler
+ //we want them running on the i/o thread from the Observable emitting all the edges
- //
- LOG.debug( "About to audit {} source types", sourceNodes.size() );
+ //
+ LOG.debug( "About to audit {} source types", sourceNodes.size() );
- Observable<Integer> sourceMetaCleanup =
- Observable.from( sourceNodes ).flatMap( new Func1<TargetPair, Observable<Integer>>() {
- @Override
- public Observable<Integer> call( final TargetPair targetPair ) {
- return edgeMetaRepair
- .repairSources( scope, targetPair.id, targetPair.edgeType, maxVersion );
- }
- } ).last();
+ Observable<Integer> sourceMetaCleanup =
+ Observable.from( sourceNodes ).flatMap( targetPair -> edgeMetaRepair
+ .repairSources( scope, targetPair.id, targetPair.edgeType, maxVersion ) ).last();
- LOG.debug( "About to audit {} target types", targetNodes.size() );
+ LOG.debug( "About to audit {} target types", targetNodes.size() );
- Observable<Integer> targetMetaCleanup =
- Observable.from( targetNodes ).flatMap( new Func1<TargetPair, Observable<Integer>>() {
- @Override
- public Observable<Integer> call( final TargetPair targetPair ) {
- return edgeMetaRepair
- .repairTargets( scope, targetPair.id, targetPair.edgeType, maxVersion );
- }
- } ).last();
+ Observable<Integer> targetMetaCleanup =
+ Observable.from( targetNodes ).flatMap( targetPair -> edgeMetaRepair
+ .repairTargets( scope, targetPair.id, targetPair.edgeType, maxVersion ) ).last();
- //run both the source/target edge type cleanup, then proceed
- return Observable.merge( sourceMetaCleanup, targetMetaCleanup ).lastOrDefault( null )
- .flatMap( new Func1<Integer, Observable<MarkedEdge>>() {
- @Override
- public Observable<MarkedEdge> call( final Integer integer ) {
- return Observable.from( markedEdges );
- }
- } );
- }
+ //run both the source/target edge type cleanup, then proceed
+ return Observable.merge( sourceMetaCleanup, targetMetaCleanup ).lastOrDefault( null )
+ .flatMap( new Func1<Integer, Observable<MarkedEdge>>() {
+ @Override
+ public Observable<MarkedEdge> call( final Integer integer ) {
+ return Observable.from( markedEdges );
+ }
+ } );
} );
}