You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/05/28 14:53:07 UTC

[02/50] [abbrv] incubator-usergrid git commit: [USERGRID-608] Added initial version of delete flow

[USERGRID-608] Added initial version of delete flow


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/ed317b29
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/ed317b29
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/ed317b29

Branch: refs/heads/USERGRID-669
Commit: ed317b295d96c150693d441ba7d306d0ef3ff40b
Parents: 50f8a56
Author: GERey <gr...@apigee.com>
Authored: Fri May 15 12:23:28 2015 -0700
Committer: GERey <gr...@apigee.com>
Committed: Fri May 15 12:23:28 2015 -0700

----------------------------------------------------------------------
 .../corepersistence/CpEntityManager.java        | 84 ++++++++++++++------
 .../corepersistence/CpEntityManagerFactory.java |  6 +-
 .../asyncevents/EventBuilderImpl.java           | 13 ++-
 .../corepersistence/index/IndexService.java     |  2 +-
 .../corepersistence/index/IndexServiceImpl.java | 13 ++-
 .../index/ApplicationEntityIndex.java           | 21 ++++-
 .../impl/EsApplicationEntityIndexImpl.java      | 10 +++
 7 files changed, 112 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ed317b29/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 9430c4e..407ae94 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -65,6 +65,8 @@ import org.apache.usergrid.persistence.exceptions.DuplicateUniquePropertyExistsE
 import org.apache.usergrid.persistence.exceptions.EntityNotFoundException;
 import org.apache.usergrid.persistence.exceptions.RequiredPropertyNotFoundException;
 import org.apache.usergrid.persistence.exceptions.UnexpectedEntityTypeException;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
 import org.apache.usergrid.persistence.index.query.CounterResolution;
 import org.apache.usergrid.persistence.index.query.Identifier;
 import org.apache.usergrid.persistence.map.MapManager;
@@ -167,6 +169,8 @@ public class CpEntityManager implements EntityManager {
 
     private PipelineBuilderFactory pipelineBuilderFactory;
 
+    private final GraphManagerFactory graphManagerFactory;
+
     private boolean skipAggregateCounters;
     private MetricsFactory metricsFactory;
     private Timer aggCounterTimer;
@@ -207,7 +211,8 @@ public class CpEntityManager implements EntityManager {
      */
     public CpEntityManager( final CassandraService cass, final CounterUtils counterUtils, final AsyncEventService indexService, final ManagerCache managerCache,
                             final MetricsFactory metricsFactory, final EntityManagerFig entityManagerFig,
-                            final PipelineBuilderFactory pipelineBuilderFactory , final UUID applicationId ) {
+                            final PipelineBuilderFactory pipelineBuilderFactory ,
+                            final GraphManagerFactory graphManagerFactory,final UUID applicationId ) {
         this.entityManagerFig = entityManagerFig;
 
 
@@ -217,7 +222,9 @@ public class CpEntityManager implements EntityManager {
         Preconditions.checkNotNull( applicationId, "applicationId must not be null" );
         Preconditions.checkNotNull( indexService, "indexService must not be null" );
         Preconditions.checkNotNull( pipelineBuilderFactory, "pipelineBuilderFactory must not be null" );
+        Preconditions.checkNotNull( graphManagerFactory, "graphManagerFactory must not be null" );
         this.pipelineBuilderFactory = pipelineBuilderFactory;
+        this.graphManagerFactory = graphManagerFactory;
 
 
         this.managerCache = managerCache;
@@ -407,10 +414,10 @@ public class CpEntityManager implements EntityManager {
         if(entity == null) {
             return null;
         }
-        Class clazz = Schema.getDefaultSchema().getEntityClass(entity.getId().getType());
+        Class clazz = Schema.getDefaultSchema().getEntityClass( entity.getId().getType() );
 
-        Entity oldFormatEntity = EntityFactory.newEntity(entity.getId().getUuid(), entity.getId().getType(), clazz);
-        oldFormatEntity.setProperties(CpEntityMapUtils.toMap(entity));
+        Entity oldFormatEntity = EntityFactory.newEntity( entity.getId().getUuid(), entity.getId().getType(), clazz );
+        oldFormatEntity.setProperties( CpEntityMapUtils.toMap( entity ) );
 
         return oldFormatEntity;
     }
@@ -615,47 +622,72 @@ public class CpEntityManager implements EntityManager {
     }
 
 
+    /**
+     * There are a series of steps that are kicked off by a delete
+     * 1. Mark the entity in the entity collection manager as deleted
+     * 2. Mark entity as deleted in the graph
+     * 3. Kick off async process
+     * 4. Delete all entity documents out of elasticsearch.
+     * 5. Compact Graph so that it deletes the marked values.
+     * 6. Delete entity from cassandra using the map manager.
+     *
+     * @param entityRef an entity reference
+     *
+     * @throws Exception
+     */
     @Override
     public void delete( EntityRef entityRef ) throws Exception {
-        deleteAsync( entityRef ).toBlocking().lastOrDefault( null );
-        //delete from our UUID index
-        MapManager mm = getMapManagerForTypes();
-        mm.delete( entityRef.getUuid().toString() );
+        //TODO: since we want the user to mark it and we sweep it later. It should be marked by the graph manager here.
+        //Step 1 & 2 Currently to block so we ensure that marking is done immediately
+        //If this returns null then nothing was marked null so the entity doesn't exist
+        markEntity( entityRef ).toBlocking().lastOrDefault( null );
 
-    }
+        //TODO: figure out how to return async call to service tier? Do I not need to?
+        //Step 3
+        deleteAsync( entityRef );
 
+    }
 
-    private Observable deleteAsync( EntityRef entityRef ) throws Exception {
 
+    /**
+     * Marks entity for deletion in entity collection manager and graph.
+     * Convert this method to return a list of observables that we can crunch through on return.
+     * Returns merged obversable that will mark the edges in the ecm and the graph manager.
+     * @param entityRef
+     * @return
+     */
+    private Observable markEntity(EntityRef entityRef){
         if(applicationScope == null || entityRef == null){
             return Observable.empty();
         }
+        GraphManager gm =  graphManagerFactory.createEdgeManager( applicationScope );
 
         EntityCollectionManager ecm = managerCache.getEntityCollectionManager( applicationScope );
 
         Id entityId = new SimpleId( entityRef.getUuid(), entityRef.getType() );
 
-        //        if ( !UUIDUtils.isTimeBased( entityId.getUuid() ) ) {
-        //            throw new IllegalArgumentException(
-        //                "Entity Id " + entityId.getType() + ":"+ entityId.getUuid() +" uuid not time based");
-        //        }
+        //Step 1 & 2 of delete
+        return ecm.mark( entityId ).mergeWith( gm.markNode( entityId, entityRef.getUuid().timestamp() ) );
 
-        org.apache.usergrid.persistence.model.entity.Entity entity =
-                load(entityId);
+    }
 
-        if ( entity != null ) {
+    /**
+    * 4. Delete all entity documents out of elasticsearch.
+    * 5. Compact Graph so that it deletes the marked values.
+    * 6. Delete entity from cassandra using the map manager.
+     **/
+    private void deleteAsync( EntityRef entityRef ) throws Exception {
 
-            decrementEntityCollection( Schema.defaultCollectionName( entityId.getType() ) );
-            // and finally...
+        Id entityId = new SimpleId( entityRef.getUuid(), entityRef.getType() );
 
-            //delete it asynchronously
-            indexService.queueEntityDelete( applicationScope, entityId );
+        //Step 4 && 5
+        indexService.queueEntityDelete( applicationScope, entityId );
+
+        //Step 6
+        //delete from our UUID index
+        MapManager mm = getMapManagerForTypes();
+        mm.delete( entityRef.getUuid().toString() );
 
-            return ecm.mark( entityId );
-        }
-        else {
-            return Observable.empty();
-        }
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ed317b29/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index e796545..918d3d4 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -62,6 +62,7 @@ import org.apache.usergrid.persistence.exceptions.ApplicationAlreadyExistsExcept
 import org.apache.usergrid.persistence.exceptions.DuplicateUniquePropertyExistsException;
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
 import org.apache.usergrid.persistence.graph.SearchByEdgeType;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
 import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
@@ -126,6 +127,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
     private final MetricsFactory metricsFactory;
     private final AsyncEventService indexService;
     private final PipelineBuilderFactory pipelineBuilderFactory;
+    private final GraphManagerFactory graphManagerFactory;
 
     public CpEntityManagerFactory( final CassandraService cassandraService, final CounterUtils counterUtils,
                                    final Injector injector ) {
@@ -140,6 +142,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
         this.metricsFactory = injector.getInstance( MetricsFactory.class );
         this.indexService = injector.getInstance( AsyncEventService.class );
         this.pipelineBuilderFactory = injector.getInstance( PipelineBuilderFactory.class );
+        this.graphManagerFactory = injector.getInstance( GraphManagerFactory.class );
         this.applicationIdCache = injector.getInstance(ApplicationIdCacheFactory.class).getInstance(
             getManagementEntityManager() );
 
@@ -198,7 +201,8 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
 
 
     private EntityManager _getEntityManager( UUID applicationId ) {
-        EntityManager em = new CpEntityManager(cassandraService, counterUtils, indexService, managerCache, metricsFactory, entityManagerFig, pipelineBuilderFactory,  applicationId );
+        EntityManager em = new CpEntityManager(cassandraService, counterUtils, indexService, managerCache,
+            metricsFactory, entityManagerFig, pipelineBuilderFactory, graphManagerFactory, applicationId );
         return em;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ed317b29/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
index d678a18..4e83e0b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
@@ -115,6 +115,9 @@ public class EventBuilderImpl implements EventBuilder {
     }
 
 
+    //Does the queue entityDelete mark the entity then immediately does to the deleteEntityIndex. seems like
+    //it'll need to be pushed up higher so we can do the marking that isn't async or does it not matter?
+
     @Override
     public EntityDeleteResults queueEntityDelete( final ApplicationScope applicationScope, final Id entityId ) {
         log.debug( "Deleting entity id from index in app scope {} with entityId {} }", applicationScope, entityId );
@@ -125,19 +128,25 @@ public class EventBuilderImpl implements EventBuilder {
 
         //needs get versions here.
 
+
         //TODO: change this to be an observable
         //so we get these versions and loop through them until we find the MvccLogEntry that is marked as delete.
         //TODO: evauluate this to possibly be an observable to pass to the nextmethod.
         MvccLogEntry mostRecentlyMarked = ecm.getVersions( entityId ).toBlocking()
-                                             .first( mvccLogEntry -> mvccLogEntry.getState()== MvccLogEntry.State.DELETED );
+                                             .firstOrDefault( null,
+                                                 mvccLogEntry -> mvccLogEntry.getState() == MvccLogEntry.State.DELETED );
 
+        //If there is nothing marked then we shouldn't return any results.
+        //TODO: evaluate if we want to return null or return empty observable when we don't have any results marked as deleted.
+        if(mostRecentlyMarked == null)
+            return null;
 
         //observable of index operation messages
         //this method will need the most recent version.
         //When we go to compact the graph make sure you turn on the debugging mode for the deleted nodes so
         //we can verify that we mark them. That said that part seems kinda done. as we also delete the mvcc buffers.
         final Observable<IndexOperationMessage> edgeObservable =
-            indexService.deleteEntityIndexes( applicationScope, entityId,mostRecentlyMarked.getVersion() );
+            indexService.deleteEntityIndexes( applicationScope, entityId, mostRecentlyMarked.getVersion() );
 
 
         //observable of entries as the batches are deleted

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ed317b29/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java
index 47601fb..54eb464 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java
@@ -81,7 +81,7 @@ public interface IndexService {
      * @return
      */
     Observable<IndexOperationMessage> deleteEntityIndexes(final ApplicationScope applicationScope, final Id entityId,
-                                                         final UUID version);
+                                                         final UUID markedVersion);
 
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ed317b29/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
index 8700c9b..7d7d0d9 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
@@ -218,7 +218,7 @@ public class IndexServiceImpl implements IndexService {
 
     @Override
     public Observable<IndexOperationMessage> deleteEntityIndexes( final ApplicationScope applicationScope,
-                                                                  final Id entityId, final UUID version ) {
+                                                                  final Id entityId, final UUID markedVersion ) {
 
         //bootstrap the lower modules from their caches
         final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );
@@ -238,7 +238,10 @@ public class IndexServiceImpl implements IndexService {
         final Observable<IndexEdge> observable = Observable.merge( sourceEdgesToIndex, targetSizes);
         //do our observable for batching
         //try to send a whole batch if we can
-        version.
+
+
+
+        //loop through candidateResults and deindex every single result that comeback.
 
         //do our observable for batching
         //try to send a whole batch if we can
@@ -249,7 +252,11 @@ public class IndexServiceImpl implements IndexService {
                 //collect results into a single batch
                 .collect( () -> ei.createBatch(), ( batch, indexEdge ) -> {
                     //logger.debug( "adding edge {} to batch for entity {}", indexEdge, entity );
-                    batch.deindex( indexEdge, entityId, version );
+                    //TODO: refactor into stages of observable, also need a loop to get entities until we recieve nothing back.
+                    CandidateResults crs = ei.getAllEntityVersionBeforeMark( entityId, markedVersion, 1000, 0 );
+                    for(CandidateResult cr: crs){
+                        batch.deindex( indexEdge, cr);
+                    }
                 } )
                     //return the future from the batch execution
                 .flatMap( batch -> batch.execute() ) );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ed317b29/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java
index 20c3f12..0e0e033 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java
@@ -20,6 +20,8 @@
 package org.apache.usergrid.persistence.index;
 
 
+import java.util.UUID;
+
 import org.apache.usergrid.persistence.model.entity.Id;
 
 import rx.Observable;
@@ -49,15 +51,26 @@ public interface ApplicationEntityIndex {
 
 
     /**
-     * Same as search, just iterates all documents that match the index edge exactly
-     * @param edge
-     * @param limit
-     * @param offset
+     * Same as search, just iterates all documents that match the index edge exactly.
+     * @param edge The edge to search on
+     * @param entityId The entity that the searchEdge is connected to.
+     * @param limit The limit of the values to return per search.
+     * @param offset The offset to page the query on.
      * @return
      */
     CandidateResults getAllEdgeDocuments(final IndexEdge edge, final Id entityId,  final int limit, final int offset);
 
     /**
+     * Returns all entity documents that match the entityId and come before the marked version
+     * @param entityId The entityId to match when searching
+     * @param markedVersion The version that has been marked for deletion. All version before this one must be deleted.
+     * @param limit The limit of the values to return per search.
+     * @param offset The offset to page the query on.
+     * @return
+     */
+    CandidateResults getAllEntityVersionBeforeMark(final Id entityId, final UUID markedVersion ,final int limit, final int offset);
+
+    /**
      * delete all application records
      * @return
      */

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ed317b29/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
index 2256769..2709569 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
@@ -22,6 +22,7 @@ package org.apache.usergrid.persistence.index.impl;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.UUID;
 
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ListenableActionFuture;
@@ -40,6 +41,8 @@ import org.elasticsearch.search.SearchHits;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.commons.lang.NotImplementedException;
+
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.util.ValidationUtils;
@@ -221,6 +224,13 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex {
     }
 
 
+    @Override
+    public CandidateResults getAllEntityVersionBeforeMark( final Id entityId, final UUID markedVersion, final int limit,
+                                                           final int offset ) {
+        throw new NotImplementedException( "Implement me or else I won't work." );
+    }
+
+
     /**
      * Completely delete an index.
      */