You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/06/02 23:17:35 UTC

[19/19] incubator-usergrid git commit: Fixes issue with ignored compaction operation on edge delete

Fixes issue with ignored compaction operation on edge delete


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/c582d278
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/c582d278
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/c582d278

Branch: refs/heads/USERGRID-685
Commit: c582d2782fab7d035243cd09b80052588623eb38
Parents: dfda3cf
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Jun 2 15:17:07 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Jun 2 15:17:07 2015 -0600

----------------------------------------------------------------------
 .../corepersistence/CpRelationManager.java      |  18 +--
 .../asyncevents/EventBuilderImpl.java           |   4 +-
 .../impl/stage/NodeDeleteListenerImpl.java      | 113 ++++++++-----------
 3 files changed, 52 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c582d278/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index bc3c803..c717c5b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -414,7 +414,7 @@ public class CpRelationManager implements RelationManager {
                     });
                 }
             })
-            .toBlocking().lastOrDefault(null);
+            .toBlocking().lastOrDefault( null );
 
         //check if we need to reverse our edges
 
@@ -622,7 +622,7 @@ public class CpRelationManager implements RelationManager {
 
         final IdBuilder pipelineBuilder =
             pipelineBuilderFactory.create( applicationScope ).withCursor( query.getCursor() )
-                                  .withLimit( query.getLimit() ).fromId(cpHeadEntity.getId());
+                                  .withLimit( query.getLimit() ).fromId( cpHeadEntity.getId() );
 
 
         final EntityBuilder results;
@@ -791,18 +791,10 @@ public class CpRelationManager implements RelationManager {
 
         final SearchByEdge search = createConnectionSearchByEdge( sourceId, connectionType, targetEntity.getId() );
 
-        //delete all the edges
-        final Edge lastEdge =
-            gm.loadEdgeVersions( search ).flatMap( returnedEdge -> gm.markEdge( returnedEdge ) ).toBlocking()
+        //delete all the edges and queue their processing
+        gm.loadEdgeVersions( search ).flatMap( returnedEdge -> gm.markEdge( returnedEdge ) ).doOnNext(
+                returnedEdge -> indexService.queueDeleteEdge( applicationScope, returnedEdge ) ).toBlocking()
               .lastOrDefault( null );
-
-        if ( lastEdge != null ) {
-            final ApplicationEntityIndex ei = managerCache.getEntityIndex( applicationScope );
-            final EntityIndexBatch batch = ei.createBatch();
-
-            SearchEdge indexScope = createSearchEdgeFromSource( lastEdge );
-            batch.deindex( indexScope, targetEntity );
-        }
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c582d278/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
index 57f30fd..e4cd4b5 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
@@ -108,9 +108,9 @@ public class EventBuilderImpl implements EventBuilder {
         log.debug( "Deleting in app scope {} with edge {} }", applicationScope, edge );
 
         final Observable<IndexOperationMessage> edgeObservable =
-            indexService.deleteIndexEdge( applicationScope, edge ).doOnCompleted( () -> {
+            indexService.deleteIndexEdge( applicationScope, edge ).flatMap( batch -> {
                 final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );
-                gm.deleteEdge( edge );
+                return gm.deleteEdge( edge ).map( deletedEdge -> batch );
             } );
 
         return edgeObservable;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c582d278/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
index 10ee0f6..e4eb5fc 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
@@ -158,112 +158,89 @@ public class NodeDeleteListenerImpl implements NodeDeleteListener {
         //get all edges pointing to the target node and buffer then into groups for deletion
         Observable<MarkedEdge> targetEdges =
                 getEdgesTypesToTarget( scope, new SimpleSearchEdgeType( node, null, null ) )
-                        .subscribeOn( Schedulers.io() ).flatMap( new Func1<String, Observable<MarkedEdge>>() {
-                    @Override
-                    public Observable<MarkedEdge> call( final String edgeType ) {
-                        return Observable.create( new ObservableIterator<MarkedEdge>( "getTargetEdges" ) {
+                        .subscribeOn( Schedulers.io() ).flatMap( edgeType -> Observable.create( new ObservableIterator<MarkedEdge>( "getTargetEdges" ) {
                             @Override
                             protected Iterator<MarkedEdge> getIterator() {
                                 return storageSerialization.getEdgesToTarget( scope,
                                         new SimpleSearchByEdgeType( node, edgeType, maxVersion, SearchByEdgeType.Order.DESCENDING,  Optional.<Edge>absent() ) );
                             }
-                        } );
-                    }
-                } );
+                        } ) );
 
 
         //get all edges pointing to the source node and buffer them into groups for deletion
         Observable<MarkedEdge> sourceEdges =
                 getEdgesTypesFromSource( scope, new SimpleSearchEdgeType( node, null, null ) )
-                        .subscribeOn( Schedulers.io() ).flatMap( new Func1<String, Observable<MarkedEdge>>() {
-                    @Override
-                    public Observable<MarkedEdge> call( final String edgeType ) {
-                        return Observable.create( new ObservableIterator<MarkedEdge>( "getSourceEdges" ) {
+                        .subscribeOn( Schedulers.io() ).flatMap( edgeType -> Observable.create( new ObservableIterator<MarkedEdge>( "getSourceEdges" ) {
                             @Override
                             protected Iterator<MarkedEdge> getIterator() {
                                 return storageSerialization.getEdgesFromSource( scope,
                                         new SimpleSearchByEdgeType( node, edgeType, maxVersion, SearchByEdgeType.Order.DESCENDING,  Optional.<Edge>absent() ) );
                             }
-                        } );
-                    }
-                } );
+                        } ) );
 
         //merge both source and target into 1 observable.  We'll need to check them all regardless of order
         return Observable.merge( targetEdges, sourceEdges )
 
                 //buffer and delete marked edges in our buffer size so we're making less trips to cassandra
-                .buffer( graphFig.getScanPageSize() ).flatMap( new Func1<List<MarkedEdge>, Observable<MarkedEdge>>() {
-                    @Override
-                    public Observable<MarkedEdge> call( final List<MarkedEdge> markedEdges ) {
+                .buffer( graphFig.getScanPageSize() ).flatMap( markedEdges -> {
 
-                        LOG.debug( "Batching {} edges for node {} for deletion", markedEdges.size(), node );
+                    LOG.debug( "Batching {} edges for node {} for deletion", markedEdges.size(), node );
 
-                        final MutationBatch batch = keyspace.prepareMutationBatch();
+                    final MutationBatch batch = keyspace.prepareMutationBatch();
 
-                        Set<TargetPair> sourceNodes = new HashSet<>( markedEdges.size() );
-                        Set<TargetPair> targetNodes = new HashSet<>( markedEdges.size() );
+                    Set<TargetPair> sourceNodes = new HashSet<>( markedEdges.size() );
+                    Set<TargetPair> targetNodes = new HashSet<>( markedEdges.size() );
 
-                        for ( MarkedEdge edge : markedEdges ) {
+                    for ( MarkedEdge edge : markedEdges ) {
 
-                            //delete the newest edge <= the version on the node delete
+                        //delete the newest edge <= the version on the node delete
 
-                            //we use the version specified on the delete purposefully.  If these edges are re-written
-                            //at a greater time we want them to exit
-                            batch.mergeShallow( storageSerialization.deleteEdge( scope, edge, eventTimestamp ) );
+                        //we use the version specified on the delete purposefully.  If these edges are re-written
+                        //at a greater time we want them to exit
+                        batch.mergeShallow( storageSerialization.deleteEdge( scope, edge, eventTimestamp ) );
 
-                            sourceNodes.add( new TargetPair( edge.getSourceNode(), edge.getType() ) );
-                            targetNodes.add( new TargetPair( edge.getTargetNode(), edge.getType() ) );
-                        }
+                        sourceNodes.add( new TargetPair( edge.getSourceNode(), edge.getType() ) );
+                        targetNodes.add( new TargetPair( edge.getTargetNode(), edge.getType() ) );
+                    }
 
-                        try {
-                            batch.execute();
-                        }
-                        catch ( ConnectionException e ) {
-                            throw new RuntimeException( "Unable to connect to casandra", e );
-                        }
+                    try {
+                        batch.execute();
+                    }
+                    catch ( ConnectionException e ) {
+                        throw new RuntimeException( "Unable to connect to casandra", e );
+                    }
 
-                        //now  delete meta data
+                    //now  delete meta data
 
 
-                        //delete both the source and target meta data in parallel for the edge we deleted in the
-                        // previous step
-                        //if nothing else is using them.  We purposefully do not schedule them on a new scheduler
-                        //we want them running on the i/o thread from the Observable emitting all the edges
+                    //delete both the source and target meta data in parallel for the edge we deleted in the
+                    // previous step
+                    //if nothing else is using them.  We purposefully do not schedule them on a new scheduler
+                    //we want them running on the i/o thread from the Observable emitting all the edges
 
-                        //
-                        LOG.debug( "About to audit {} source types", sourceNodes.size() );
+                    //
+                    LOG.debug( "About to audit {} source types", sourceNodes.size() );
 
-                        Observable<Integer> sourceMetaCleanup =
-                                Observable.from( sourceNodes ).flatMap( new Func1<TargetPair, Observable<Integer>>() {
-                                    @Override
-                                    public Observable<Integer> call( final TargetPair targetPair ) {
-                                        return edgeMetaRepair
-                                                .repairSources( scope, targetPair.id, targetPair.edgeType, maxVersion );
-                                    }
-                                } ).last();
+                    Observable<Integer> sourceMetaCleanup =
+                            Observable.from( sourceNodes ).flatMap( targetPair -> edgeMetaRepair
+                                    .repairSources( scope, targetPair.id, targetPair.edgeType, maxVersion ) ).last();
 
 
-                        LOG.debug( "About to audit {} target types", targetNodes.size() );
+                    LOG.debug( "About to audit {} target types", targetNodes.size() );
 
-                        Observable<Integer> targetMetaCleanup =
-                                Observable.from( targetNodes ).flatMap( new Func1<TargetPair, Observable<Integer>>() {
-                                    @Override
-                                    public Observable<Integer> call( final TargetPair targetPair ) {
-                                        return edgeMetaRepair
-                                                .repairTargets( scope, targetPair.id, targetPair.edgeType, maxVersion );
-                                    }
-                                } ).last();
+                    Observable<Integer> targetMetaCleanup =
+                            Observable.from( targetNodes ).flatMap( targetPair -> edgeMetaRepair
+                                    .repairTargets( scope, targetPair.id, targetPair.edgeType, maxVersion ) ).last();
 
 
-                        //run both the source/target edge type cleanup, then proceed
-                        return Observable.merge( sourceMetaCleanup, targetMetaCleanup ).lastOrDefault( null )
-                                         .flatMap( new Func1<Integer, Observable<MarkedEdge>>() {
-                                             @Override
-                                             public Observable<MarkedEdge> call( final Integer integer ) {
-                                                 return Observable.from( markedEdges );
-                                             }
-                                         } );
-                    }
+                    //run both the source/target edge type cleanup, then proceed
+                    return Observable.merge( sourceMetaCleanup, targetMetaCleanup ).lastOrDefault( null )
+                                     .flatMap( new Func1<Integer, Observable<MarkedEdge>>() {
+                                         @Override
+                                         public Observable<MarkedEdge> call( final Integer integer ) {
+                                             return Observable.from( markedEdges );
+                                         }
+                                     } );
                 } );
     }