You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by gr...@apache.org on 2015/05/14 22:59:31 UTC

incubator-usergrid git commit: [USERGRID-608] Copied method indexEntity to deleteEntityIndexes and made it a deindex call that requires version. Added version to method signature. Created Observable to get first marked entry.

Repository: incubator-usergrid
Updated Branches:
  refs/heads/USERGRID-608 [created] 50f8a56fd


[USERGRID-608] Copied method indexEntity to deleteEntityIndexes and made it a deindex call that requires version.
Added version to method signature.
Created Observable to get first marked entry.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/50f8a56f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/50f8a56f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/50f8a56f

Branch: refs/heads/USERGRID-608
Commit: 50f8a56fdfa558f9204c348bec59c938347c7485
Parents: 79aea6a
Author: GERey <gr...@apigee.com>
Authored: Thu May 14 13:59:30 2015 -0700
Committer: GERey <gr...@apigee.com>
Committed: Thu May 14 13:59:30 2015 -0700

----------------------------------------------------------------------
 .../asyncevents/EventBuilderImpl.java           | 13 +++++-
 .../corepersistence/index/IndexService.java     |  5 ++-
 .../corepersistence/index/IndexServiceImpl.java | 42 +++++++++++++++++---
 3 files changed, 53 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/50f8a56f/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
index c0d82d2..d678a18 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
@@ -123,10 +123,21 @@ public class EventBuilderImpl implements EventBuilder {
 
         final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );
 
+        //needs get versions here.
+
+        //TODO: change this to be an observable
+        //so we get these versions and loop through them until we find the MvccLogEntry that is marked as delete.
+        //TODO: evauluate this to possibly be an observable to pass to the nextmethod.
+        MvccLogEntry mostRecentlyMarked = ecm.getVersions( entityId ).toBlocking()
+                                             .first( mvccLogEntry -> mvccLogEntry.getState()== MvccLogEntry.State.DELETED );
+
 
         //observable of index operation messages
+        //this method will need the most recent version.
+        //When we go to compact the graph make sure you turn on the debugging mode for the deleted nodes so
+        //we can verify that we mark them. That said that part seems kinda done. as we also delete the mvcc buffers.
         final Observable<IndexOperationMessage> edgeObservable =
-            indexService.deleteEntityIndexes( applicationScope, entityId );
+            indexService.deleteEntityIndexes( applicationScope, entityId,mostRecentlyMarked.getVersion() );
 
 
         //observable of entries as the batches are deleted

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/50f8a56f/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java
index 30e4dad..47601fb 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java
@@ -20,6 +20,8 @@
 package org.apache.usergrid.corepersistence.index;
 
 
+import java.util.UUID;
+
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.index.IndexEdge;
@@ -78,7 +80,8 @@ public interface IndexService {
      * @param entityId
      * @return
      */
-    Observable<IndexOperationMessage> deleteEntityIndexes(final ApplicationScope applicationScope, final Id entityId);
+    Observable<IndexOperationMessage> deleteEntityIndexes(final ApplicationScope applicationScope, final Id entityId,
+                                                         final UUID version);
 
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/50f8a56f/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
index d616090..8700c9b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
@@ -21,11 +21,11 @@ package org.apache.usergrid.corepersistence.index;
 
 
 import java.util.Iterator;
+import java.util.UUID;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.usergrid.exception.NotImplementedException;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.core.metrics.ObservableTimer;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
@@ -41,7 +41,6 @@ import org.apache.usergrid.persistence.index.EntityIndexBatch;
 import org.apache.usergrid.persistence.index.EntityIndexFactory;
 import org.apache.usergrid.persistence.index.IndexEdge;
 import org.apache.usergrid.persistence.index.IndexFig;
-import org.apache.usergrid.persistence.index.SearchTypes;
 import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
@@ -219,10 +218,43 @@ public class IndexServiceImpl implements IndexService {
 
     @Override
     public Observable<IndexOperationMessage> deleteEntityIndexes( final ApplicationScope applicationScope,
-                                                                  final Id entityId ) {
+                                                                  final Id entityId, final UUID version ) {
 
-        //TODO query ES and remove this entityId
-        throw new NotImplementedException( "Implement me" );
+        //bootstrap the lower modules from their caches
+        final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );
+        final ApplicationEntityIndex ei = entityIndexFactory.createApplicationEntityIndex( applicationScope );
+
+        //we always index in the target scope
+        final Observable<Edge> edgesToTarget = edgesObservable.edgesToTarget( gm, entityId );
+
+        //we may have to index  we're indexing from source->target here
+        final Observable<IndexEdge> sourceEdgesToIndex = edgesToTarget.map( edge -> generateScopeFromSource( edge ) );
+
+
+        //we might or might not need to index from target-> source
+        final Observable<IndexEdge> targetSizes = getIndexEdgesAsTarget( gm, entityId );
+
+        //merge the edges together
+        final Observable<IndexEdge> observable = Observable.merge( sourceEdgesToIndex, targetSizes);
+        //do our observable for batching
+        //try to send a whole batch if we can
+        version.
+
+        //do our observable for batching
+        //try to send a whole batch if we can
+        final Observable<IndexOperationMessage>  batches =  observable.buffer( indexFig.getIndexBatchSize() )
+
+            //map into batches based on our buffer size
+            .flatMap( buffer -> Observable.from( buffer )
+                //collect results into a single batch
+                .collect( () -> ei.createBatch(), ( batch, indexEdge ) -> {
+                    //logger.debug( "adding edge {} to batch for entity {}", indexEdge, entity );
+                    batch.deindex( indexEdge, entityId, version );
+                } )
+                    //return the future from the batch execution
+                .flatMap( batch -> batch.execute() ) );
+
+        return ObservableTimer.time( batches, indexTimer );
     }