You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/05/27 19:28:42 UTC
[04/19] incubator-usergrid git commit: [USERGRID-608] Added initial
version of delete flow
[USERGRID-608] Added initial version of delete flow
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/ed317b29
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/ed317b29
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/ed317b29
Branch: refs/heads/two-dot-o-dev
Commit: ed317b295d96c150693d441ba7d306d0ef3ff40b
Parents: 50f8a56
Author: GERey <gr...@apigee.com>
Authored: Fri May 15 12:23:28 2015 -0700
Committer: GERey <gr...@apigee.com>
Committed: Fri May 15 12:23:28 2015 -0700
----------------------------------------------------------------------
.../corepersistence/CpEntityManager.java | 84 ++++++++++++++------
.../corepersistence/CpEntityManagerFactory.java | 6 +-
.../asyncevents/EventBuilderImpl.java | 13 ++-
.../corepersistence/index/IndexService.java | 2 +-
.../corepersistence/index/IndexServiceImpl.java | 13 ++-
.../index/ApplicationEntityIndex.java | 21 ++++-
.../impl/EsApplicationEntityIndexImpl.java | 10 +++
7 files changed, 112 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ed317b29/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 9430c4e..407ae94 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -65,6 +65,8 @@ import org.apache.usergrid.persistence.exceptions.DuplicateUniquePropertyExistsE
import org.apache.usergrid.persistence.exceptions.EntityNotFoundException;
import org.apache.usergrid.persistence.exceptions.RequiredPropertyNotFoundException;
import org.apache.usergrid.persistence.exceptions.UnexpectedEntityTypeException;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
import org.apache.usergrid.persistence.index.query.CounterResolution;
import org.apache.usergrid.persistence.index.query.Identifier;
import org.apache.usergrid.persistence.map.MapManager;
@@ -167,6 +169,8 @@ public class CpEntityManager implements EntityManager {
private PipelineBuilderFactory pipelineBuilderFactory;
+ private final GraphManagerFactory graphManagerFactory;
+
private boolean skipAggregateCounters;
private MetricsFactory metricsFactory;
private Timer aggCounterTimer;
@@ -207,7 +211,8 @@ public class CpEntityManager implements EntityManager {
*/
public CpEntityManager( final CassandraService cass, final CounterUtils counterUtils, final AsyncEventService indexService, final ManagerCache managerCache,
final MetricsFactory metricsFactory, final EntityManagerFig entityManagerFig,
- final PipelineBuilderFactory pipelineBuilderFactory , final UUID applicationId ) {
+ final PipelineBuilderFactory pipelineBuilderFactory ,
+ final GraphManagerFactory graphManagerFactory,final UUID applicationId ) {
this.entityManagerFig = entityManagerFig;
@@ -217,7 +222,9 @@ public class CpEntityManager implements EntityManager {
Preconditions.checkNotNull( applicationId, "applicationId must not be null" );
Preconditions.checkNotNull( indexService, "indexService must not be null" );
Preconditions.checkNotNull( pipelineBuilderFactory, "pipelineBuilderFactory must not be null" );
+ Preconditions.checkNotNull( graphManagerFactory, "graphManagerFactory must not be null" );
this.pipelineBuilderFactory = pipelineBuilderFactory;
+ this.graphManagerFactory = graphManagerFactory;
this.managerCache = managerCache;
@@ -407,10 +414,10 @@ public class CpEntityManager implements EntityManager {
if(entity == null) {
return null;
}
- Class clazz = Schema.getDefaultSchema().getEntityClass(entity.getId().getType());
+ Class clazz = Schema.getDefaultSchema().getEntityClass( entity.getId().getType() );
- Entity oldFormatEntity = EntityFactory.newEntity(entity.getId().getUuid(), entity.getId().getType(), clazz);
- oldFormatEntity.setProperties(CpEntityMapUtils.toMap(entity));
+ Entity oldFormatEntity = EntityFactory.newEntity( entity.getId().getUuid(), entity.getId().getType(), clazz );
+ oldFormatEntity.setProperties( CpEntityMapUtils.toMap( entity ) );
return oldFormatEntity;
}
@@ -615,47 +622,72 @@ public class CpEntityManager implements EntityManager {
}
+ /**
+ * There are a series of steps that are kicked off by a delete
+ * 1. Mark the entity in the entity collection manager as deleted
+ * 2. Mark entity as deleted in the graph
+ * 3. Kick off async process
+ * 4. Delete all entity documents out of elasticsearch.
+ * 5. Compact Graph so that it deletes the marked values.
+ * 6. Delete entity from cassandra using the map manager.
+ *
+ * @param entityRef an entity reference
+ *
+ * @throws Exception
+ */
@Override
public void delete( EntityRef entityRef ) throws Exception {
- deleteAsync( entityRef ).toBlocking().lastOrDefault( null );
- //delete from our UUID index
- MapManager mm = getMapManagerForTypes();
- mm.delete( entityRef.getUuid().toString() );
+ //TODO: since we want the user to mark it and we sweep it later. It should be marked by the graph manager here.
+ //Step 1 & 2 Currently to block so we ensure that marking is done immediately
+ //If this returns null then nothing was marked null so the entity doesn't exist
+ markEntity( entityRef ).toBlocking().lastOrDefault( null );
- }
+ //TODO: figure out how to return async call to service tier? Do I not need to?
+ //Step 3
+ deleteAsync( entityRef );
+ }
- private Observable deleteAsync( EntityRef entityRef ) throws Exception {
+ /**
+ * Marks entity for deletion in entity collection manager and graph.
+ * Convert this method to return a list of observables that we can crunch through on return.
+ * Returns merged obversable that will mark the edges in the ecm and the graph manager.
+ * @param entityRef
+ * @return
+ */
+ private Observable markEntity(EntityRef entityRef){
if(applicationScope == null || entityRef == null){
return Observable.empty();
}
+ GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );
EntityCollectionManager ecm = managerCache.getEntityCollectionManager( applicationScope );
Id entityId = new SimpleId( entityRef.getUuid(), entityRef.getType() );
- // if ( !UUIDUtils.isTimeBased( entityId.getUuid() ) ) {
- // throw new IllegalArgumentException(
- // "Entity Id " + entityId.getType() + ":"+ entityId.getUuid() +" uuid not time based");
- // }
+ //Step 1 & 2 of delete
+ return ecm.mark( entityId ).mergeWith( gm.markNode( entityId, entityRef.getUuid().timestamp() ) );
- org.apache.usergrid.persistence.model.entity.Entity entity =
- load(entityId);
+ }
- if ( entity != null ) {
+ /**
+ * 4. Delete all entity documents out of elasticsearch.
+ * 5. Compact Graph so that it deletes the marked values.
+ * 6. Delete entity from cassandra using the map manager.
+ **/
+ private void deleteAsync( EntityRef entityRef ) throws Exception {
- decrementEntityCollection( Schema.defaultCollectionName( entityId.getType() ) );
- // and finally...
+ Id entityId = new SimpleId( entityRef.getUuid(), entityRef.getType() );
- //delete it asynchronously
- indexService.queueEntityDelete( applicationScope, entityId );
+ //Step 4 && 5
+ indexService.queueEntityDelete( applicationScope, entityId );
+
+ //Step 6
+ //delete from our UUID index
+ MapManager mm = getMapManagerForTypes();
+ mm.delete( entityRef.getUuid().toString() );
- return ecm.mark( entityId );
- }
- else {
- return Observable.empty();
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ed317b29/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index e796545..918d3d4 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -62,6 +62,7 @@ import org.apache.usergrid.persistence.exceptions.ApplicationAlreadyExistsExcept
import org.apache.usergrid.persistence.exceptions.DuplicateUniquePropertyExistsException;
import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
import org.apache.usergrid.persistence.graph.SearchByEdgeType;
import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
@@ -126,6 +127,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
private final MetricsFactory metricsFactory;
private final AsyncEventService indexService;
private final PipelineBuilderFactory pipelineBuilderFactory;
+ private final GraphManagerFactory graphManagerFactory;
public CpEntityManagerFactory( final CassandraService cassandraService, final CounterUtils counterUtils,
final Injector injector ) {
@@ -140,6 +142,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
this.metricsFactory = injector.getInstance( MetricsFactory.class );
this.indexService = injector.getInstance( AsyncEventService.class );
this.pipelineBuilderFactory = injector.getInstance( PipelineBuilderFactory.class );
+ this.graphManagerFactory = injector.getInstance( GraphManagerFactory.class );
this.applicationIdCache = injector.getInstance(ApplicationIdCacheFactory.class).getInstance(
getManagementEntityManager() );
@@ -198,7 +201,8 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
private EntityManager _getEntityManager( UUID applicationId ) {
- EntityManager em = new CpEntityManager(cassandraService, counterUtils, indexService, managerCache, metricsFactory, entityManagerFig, pipelineBuilderFactory, applicationId );
+ EntityManager em = new CpEntityManager(cassandraService, counterUtils, indexService, managerCache,
+ metricsFactory, entityManagerFig, pipelineBuilderFactory, graphManagerFactory, applicationId );
return em;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ed317b29/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
index d678a18..4e83e0b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
@@ -115,6 +115,9 @@ public class EventBuilderImpl implements EventBuilder {
}
+ //Does the queue entityDelete mark the entity then immediately does to the deleteEntityIndex. seems like
+ //it'll need to be pushed up higher so we can do the marking that isn't async or does it not matter?
+
@Override
public EntityDeleteResults queueEntityDelete( final ApplicationScope applicationScope, final Id entityId ) {
log.debug( "Deleting entity id from index in app scope {} with entityId {} }", applicationScope, entityId );
@@ -125,19 +128,25 @@ public class EventBuilderImpl implements EventBuilder {
//needs get versions here.
+
//TODO: change this to be an observable
//so we get these versions and loop through them until we find the MvccLogEntry that is marked as delete.
//TODO: evauluate this to possibly be an observable to pass to the nextmethod.
MvccLogEntry mostRecentlyMarked = ecm.getVersions( entityId ).toBlocking()
- .first( mvccLogEntry -> mvccLogEntry.getState()== MvccLogEntry.State.DELETED );
+ .firstOrDefault( null,
+ mvccLogEntry -> mvccLogEntry.getState() == MvccLogEntry.State.DELETED );
+ //If there is nothing marked then we shouldn't return any results.
+ //TODO: evaluate if we want to return null or return empty observable when we don't have any results marked as deleted.
+ if(mostRecentlyMarked == null)
+ return null;
//observable of index operation messages
//this method will need the most recent version.
//When we go to compact the graph make sure you turn on the debugging mode for the deleted nodes so
//we can verify that we mark them. That said that part seems kinda done. as we also delete the mvcc buffers.
final Observable<IndexOperationMessage> edgeObservable =
- indexService.deleteEntityIndexes( applicationScope, entityId,mostRecentlyMarked.getVersion() );
+ indexService.deleteEntityIndexes( applicationScope, entityId, mostRecentlyMarked.getVersion() );
//observable of entries as the batches are deleted
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ed317b29/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java
index 47601fb..54eb464 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java
@@ -81,7 +81,7 @@ public interface IndexService {
* @return
*/
Observable<IndexOperationMessage> deleteEntityIndexes(final ApplicationScope applicationScope, final Id entityId,
- final UUID version);
+ final UUID markedVersion);
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ed317b29/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
index 8700c9b..7d7d0d9 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
@@ -218,7 +218,7 @@ public class IndexServiceImpl implements IndexService {
@Override
public Observable<IndexOperationMessage> deleteEntityIndexes( final ApplicationScope applicationScope,
- final Id entityId, final UUID version ) {
+ final Id entityId, final UUID markedVersion ) {
//bootstrap the lower modules from their caches
final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );
@@ -238,7 +238,10 @@ public class IndexServiceImpl implements IndexService {
final Observable<IndexEdge> observable = Observable.merge( sourceEdgesToIndex, targetSizes);
//do our observable for batching
//try to send a whole batch if we can
- version.
+
+
+
+ //loop through candidateResults and deindex every single result that comeback.
//do our observable for batching
//try to send a whole batch if we can
@@ -249,7 +252,11 @@ public class IndexServiceImpl implements IndexService {
//collect results into a single batch
.collect( () -> ei.createBatch(), ( batch, indexEdge ) -> {
//logger.debug( "adding edge {} to batch for entity {}", indexEdge, entity );
- batch.deindex( indexEdge, entityId, version );
+ //TODO: refactor into stages of observable, also need a loop to get entities until we recieve nothing back.
+ CandidateResults crs = ei.getAllEntityVersionBeforeMark( entityId, markedVersion, 1000, 0 );
+ for(CandidateResult cr: crs){
+ batch.deindex( indexEdge, cr);
+ }
} )
//return the future from the batch execution
.flatMap( batch -> batch.execute() ) );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ed317b29/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java
index 20c3f12..0e0e033 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java
@@ -20,6 +20,8 @@
package org.apache.usergrid.persistence.index;
+import java.util.UUID;
+
import org.apache.usergrid.persistence.model.entity.Id;
import rx.Observable;
@@ -49,15 +51,26 @@ public interface ApplicationEntityIndex {
/**
- * Same as search, just iterates all documents that match the index edge exactly
- * @param edge
- * @param limit
- * @param offset
+ * Same as search, just iterates all documents that match the index edge exactly.
+ * @param edge The edge to search on
+ * @param entityId The entity that the searchEdge is connected to.
+ * @param limit The limit of the values to return per search.
+ * @param offset The offset to page the query on.
* @return
*/
CandidateResults getAllEdgeDocuments(final IndexEdge edge, final Id entityId, final int limit, final int offset);
/**
+ * Returns all entity documents that match the entityId and come before the marked version
+ * @param entityId The entityId to match when searching
+ * @param markedVersion The version that has been marked for deletion. All version before this one must be deleted.
+ * @param limit The limit of the values to return per search.
+ * @param offset The offset to page the query on.
+ * @return
+ */
+ CandidateResults getAllEntityVersionBeforeMark(final Id entityId, final UUID markedVersion ,final int limit, final int offset);
+
+ /**
* delete all application records
* @return
*/
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ed317b29/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
index 2256769..2709569 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
@@ -22,6 +22,7 @@ package org.apache.usergrid.persistence.index.impl;
import java.util.ArrayList;
import java.util.List;
+import java.util.UUID;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ListenableActionFuture;
@@ -40,6 +41,8 @@ import org.elasticsearch.search.SearchHits;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.commons.lang.NotImplementedException;
+
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.util.ValidationUtils;
@@ -221,6 +224,13 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex {
}
+ @Override
+ public CandidateResults getAllEntityVersionBeforeMark( final Id entityId, final UUID markedVersion, final int limit,
+ final int offset ) {
+ throw new NotImplementedException( "Implement me or else I won't work." );
+ }
+
+
/**
* Completely delete an index.
*/