You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by gr...@apache.org on 2015/05/14 22:59:31 UTC
incubator-usergrid git commit: [USERGRID-608] Copied method
indexEntity to deleteEntityIndexes and made it a deindex call that requires
version. Added version to method signature. Created Observable to get first
marked entry.
Repository: incubator-usergrid
Updated Branches:
refs/heads/USERGRID-608 [created] 50f8a56fd
[USERGRID-608] Copied method indexEntity to deleteEntityIndexes and made it a deindex call that requires version.
Added version to method signature.
Created Observable to get first marked entry.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/50f8a56f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/50f8a56f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/50f8a56f
Branch: refs/heads/USERGRID-608
Commit: 50f8a56fdfa558f9204c348bec59c938347c7485
Parents: 79aea6a
Author: GERey <gr...@apigee.com>
Authored: Thu May 14 13:59:30 2015 -0700
Committer: GERey <gr...@apigee.com>
Committed: Thu May 14 13:59:30 2015 -0700
----------------------------------------------------------------------
.../asyncevents/EventBuilderImpl.java | 13 +++++-
.../corepersistence/index/IndexService.java | 5 ++-
.../corepersistence/index/IndexServiceImpl.java | 42 +++++++++++++++++---
3 files changed, 53 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/50f8a56f/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
index c0d82d2..d678a18 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
@@ -123,10 +123,21 @@ public class EventBuilderImpl implements EventBuilder {
final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );
+ //needs get versions here.
+
+ //TODO: change this to be an observable
+ //so we get these versions and loop through them until we find the MvccLogEntry that is marked as delete.
+ //TODO: evauluate this to possibly be an observable to pass to the nextmethod.
+ MvccLogEntry mostRecentlyMarked = ecm.getVersions( entityId ).toBlocking()
+ .first( mvccLogEntry -> mvccLogEntry.getState()== MvccLogEntry.State.DELETED );
+
//observable of index operation messages
+ //this method will need the most recent version.
+ //When we go to compact the graph make sure you turn on the debugging mode for the deleted nodes so
+ //we can verify that we mark them. That said that part seems kinda done. as we also delete the mvcc buffers.
final Observable<IndexOperationMessage> edgeObservable =
- indexService.deleteEntityIndexes( applicationScope, entityId );
+ indexService.deleteEntityIndexes( applicationScope, entityId,mostRecentlyMarked.getVersion() );
//observable of entries as the batches are deleted
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/50f8a56f/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java
index 30e4dad..47601fb 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java
@@ -20,6 +20,8 @@
package org.apache.usergrid.corepersistence.index;
+import java.util.UUID;
+
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.index.IndexEdge;
@@ -78,7 +80,8 @@ public interface IndexService {
* @param entityId
* @return
*/
- Observable<IndexOperationMessage> deleteEntityIndexes(final ApplicationScope applicationScope, final Id entityId);
+ Observable<IndexOperationMessage> deleteEntityIndexes(final ApplicationScope applicationScope, final Id entityId,
+ final UUID version);
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/50f8a56f/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
index d616090..8700c9b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
@@ -21,11 +21,11 @@ package org.apache.usergrid.corepersistence.index;
import java.util.Iterator;
+import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.usergrid.exception.NotImplementedException;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.core.metrics.ObservableTimer;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
@@ -41,7 +41,6 @@ import org.apache.usergrid.persistence.index.EntityIndexBatch;
import org.apache.usergrid.persistence.index.EntityIndexFactory;
import org.apache.usergrid.persistence.index.IndexEdge;
import org.apache.usergrid.persistence.index.IndexFig;
-import org.apache.usergrid.persistence.index.SearchTypes;
import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
@@ -219,10 +218,43 @@ public class IndexServiceImpl implements IndexService {
@Override
public Observable<IndexOperationMessage> deleteEntityIndexes( final ApplicationScope applicationScope,
- final Id entityId ) {
+ final Id entityId, final UUID version ) {
- //TODO query ES and remove this entityId
- throw new NotImplementedException( "Implement me" );
+ //bootstrap the lower modules from their caches
+ final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );
+ final ApplicationEntityIndex ei = entityIndexFactory.createApplicationEntityIndex( applicationScope );
+
+ //we always index in the target scope
+ final Observable<Edge> edgesToTarget = edgesObservable.edgesToTarget( gm, entityId );
+
+ //we may have to index we're indexing from source->target here
+ final Observable<IndexEdge> sourceEdgesToIndex = edgesToTarget.map( edge -> generateScopeFromSource( edge ) );
+
+
+ //we might or might not need to index from target-> source
+ final Observable<IndexEdge> targetSizes = getIndexEdgesAsTarget( gm, entityId );
+
+ //merge the edges together
+ final Observable<IndexEdge> observable = Observable.merge( sourceEdgesToIndex, targetSizes);
+ //do our observable for batching
+ //try to send a whole batch if we can
+ version.
+
+ //do our observable for batching
+ //try to send a whole batch if we can
+ final Observable<IndexOperationMessage> batches = observable.buffer( indexFig.getIndexBatchSize() )
+
+ //map into batches based on our buffer size
+ .flatMap( buffer -> Observable.from( buffer )
+ //collect results into a single batch
+ .collect( () -> ei.createBatch(), ( batch, indexEdge ) -> {
+ //logger.debug( "adding edge {} to batch for entity {}", indexEdge, entity );
+ batch.deindex( indexEdge, entityId, version );
+ } )
+ //return the future from the batch execution
+ .flatMap( batch -> batch.execute() ) );
+
+ return ObservableTimer.time( batches, indexTimer );
}