You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2015/11/06 21:38:05 UTC
[11/50] [abbrv] usergrid git commit: Change read repair to interact
with c* directly and only fire and index operation message to get the ES
document removed from all regions.
Change read repair to interact with c* directly and only fire and index operation message to get the ES document removed from all regions.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/1b43bda3
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/1b43bda3
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/1b43bda3
Branch: refs/heads/asf-site
Commit: 1b43bda3f801172b0e59f927ff3ae52a559d36cc
Parents: 70d7a95
Author: Michael Russo <mi...@gmail.com>
Authored: Tue Oct 27 10:50:16 2015 -0700
Committer: Michael Russo <mi...@gmail.com>
Committed: Tue Oct 27 12:56:13 2015 -0700
----------------------------------------------------------------------
.../asyncevents/AsyncEventService.java | 5 ++
.../asyncevents/InMemoryAsyncEventService.java | 5 ++
.../read/traverse/AbstractReadGraphFilter.java | 69 +++++++++++++++++---
.../traverse/ReadGraphCollectionFilter.java | 10 ++-
.../traverse/ReadGraphConnectionFilter.java | 10 ++-
.../impl/stage/NodeDeleteListenerImpl.java | 27 ++++----
6 files changed, 97 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/1b43bda3/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
index dcfffcb..dbf8996 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
@@ -77,6 +77,11 @@ public interface AsyncEventService extends ReIndexAction {
*/
void queueEntityDelete(final ApplicationScope applicationScope, final Id entityId);
+ /**
+ *
+ * @param indexOperationMessage
+ */
+ void queueIndexOperationMessage( final IndexOperationMessage indexOperationMessage );
/**
* current queue depth
http://git-wip-us.apache.org/repos/asf/usergrid/blob/1b43bda3/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
index fc6385c..d8334b3 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
@@ -105,6 +105,11 @@ public class InMemoryAsyncEventService implements AsyncEventService {
run( results.getCompactedNode() );
}
+ @Override
+ public void queueIndexOperationMessage(final IndexOperationMessage indexOperationMessage){
+ //this is not used locally
+ }
+
public void index( final ApplicationScope applicationScope, final Id id, final long updatedSince ) {
final EntityIndexOperation entityIndexOperation = new EntityIndexOperation( applicationScope, id, updatedSince );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/1b43bda3/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
index 9d050c8..89230d7 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
@@ -20,7 +20,11 @@
package org.apache.usergrid.corepersistence.pipeline.read.traverse;
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
+import org.apache.usergrid.corepersistence.asyncevents.EventBuilder;
+import org.apache.usergrid.corepersistence.asyncevents.EventBuilderImpl;
import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
+import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,6 +46,7 @@ import com.google.common.base.Optional;
import rx.Observable;
+
/**
* Command for reading graph edges
*/
@@ -51,15 +56,21 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
private final GraphManagerFactory graphManagerFactory;
private final RxTaskScheduler rxTaskScheduler;
+ private final EventBuilder eventBuilder;
+ private final AsyncEventService asyncEventService;
/**
* Create a new instance of our command
*/
public AbstractReadGraphFilter( final GraphManagerFactory graphManagerFactory,
- final RxTaskScheduler rxTaskScheduler) {
+ final RxTaskScheduler rxTaskScheduler,
+ final EventBuilder eventBuilder,
+ final AsyncEventService asyncEventService ) {
this.graphManagerFactory = graphManagerFactory;
this.rxTaskScheduler = rxTaskScheduler;
+ this.eventBuilder = eventBuilder;
+ this.asyncEventService = asyncEventService;
}
@@ -107,28 +118,56 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
final boolean isTargetNodeDelete = markedEdge.isTargetNodeDeleted();
+ if (isDeleted) {
- if(isDeleted){
logger.trace("Edge {} is deleted, deleting the edge", markedEdge);
- graphManager.deleteEdge(markedEdge).subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
+ final Observable<IndexOperationMessage> indexMessageObservable = eventBuilder.buildDeleteEdge(applicationScope, markedEdge);
+
+ indexMessageObservable
+ .compose(applyCollector())
+ .subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
.subscribe();
+
}
- if(isSourceNodeDeleted){
- final Id sourceNodeId = markedEdge.getSourceNode();
+ if (isSourceNodeDeleted) {
+ final Id sourceNodeId = markedEdge.getSourceNode();
logger.trace("Edge {} has a deleted source node, deleting the entity for id {}", markedEdge, sourceNodeId);
- graphManager.compactNode(sourceNodeId).subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
+
+ final EventBuilderImpl.EntityDeleteResults
+ entityDeleteResults = eventBuilder.buildEntityDelete(applicationScope, sourceNodeId);
+
+ entityDeleteResults.getIndexObservable()
+ .compose(applyCollector())
+ .subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
.subscribe();
+
+ Observable.merge(entityDeleteResults.getEntitiesDeleted(),
+ entityDeleteResults.getCompactedNode())
+ .subscribeOn(rxTaskScheduler.getAsyncIOScheduler()).
+ subscribe();
+
}
- if(isTargetNodeDelete){
+ if (isTargetNodeDelete) {
final Id targetNodeId = markedEdge.getTargetNode();
+ logger.trace("Edge {} has a deleted target node, deleting the entity for id {}", markedEdge, targetNodeId);
- logger.trace("Edge {} has a deleted target node, deleting the entity for id {}", markedEdge, targetNodeId );
- graphManager.compactNode(targetNodeId).subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
+ final EventBuilderImpl.EntityDeleteResults
+ entityDeleteResults = eventBuilder.buildEntityDelete(applicationScope, targetNodeId);
+
+ entityDeleteResults.getIndexObservable()
+ .compose(applyCollector())
+ .subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
.subscribe();
+
+ Observable.merge(entityDeleteResults.getEntitiesDeleted(),
+ entityDeleteResults.getCompactedNode())
+ .subscribeOn(rxTaskScheduler.getAsyncIOScheduler()).
+ subscribe();
+
}
@@ -202,4 +241,16 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
return cursorEdge;
}
}
+
+ private Observable.Transformer<IndexOperationMessage, IndexOperationMessage> applyCollector() {
+
+ return observable -> observable
+ .filter((IndexOperationMessage msg) -> !msg.isEmpty())
+ .collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single))
+ .doOnNext(indexOperation -> {
+ asyncEventService.queueIndexOperationMessage(indexOperation);
+ });
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/1b43bda3/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java
index 1d63bc6..3d7df3b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java
@@ -20,6 +20,8 @@
package org.apache.usergrid.corepersistence.pipeline.read.traverse;
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
+import org.apache.usergrid.corepersistence.asyncevents.EventBuilder;
import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
import org.apache.usergrid.persistence.graph.GraphManagerFactory;
@@ -41,8 +43,12 @@ public class ReadGraphCollectionFilter extends AbstractReadGraphFilter {
* Create a new instance of our command
*/
@Inject
- public ReadGraphCollectionFilter( final GraphManagerFactory graphManagerFactory, final RxTaskScheduler rxTaskScheduler, @Assisted final String collectionName ) {
- super( graphManagerFactory, rxTaskScheduler );
+ public ReadGraphCollectionFilter( final GraphManagerFactory graphManagerFactory,
+ final RxTaskScheduler rxTaskScheduler,
+ final EventBuilder eventBuilder,
+ final AsyncEventService asyncEventService,
+ @Assisted final String collectionName ) {
+ super( graphManagerFactory, rxTaskScheduler, eventBuilder, asyncEventService );
this.collectionName = collectionName;
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/1b43bda3/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java
index efe94db..b2d368b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java
@@ -20,6 +20,8 @@
package org.apache.usergrid.corepersistence.pipeline.read.traverse;
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
+import org.apache.usergrid.corepersistence.asyncevents.EventBuilder;
import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
import org.apache.usergrid.persistence.graph.GraphManagerFactory;
@@ -41,8 +43,12 @@ public class ReadGraphConnectionFilter extends AbstractReadGraphFilter {
* Create a new instance of our command
*/
@Inject
- public ReadGraphConnectionFilter( final GraphManagerFactory graphManagerFactory, final RxTaskScheduler rxTaskScheduler, @Assisted final String connectionName ) {
- super( graphManagerFactory, rxTaskScheduler );
+ public ReadGraphConnectionFilter( final GraphManagerFactory graphManagerFactory,
+ final RxTaskScheduler rxTaskScheduler,
+ final EventBuilder eventBuilder,
+ final AsyncEventService asyncEventService,
+ @Assisted final String connectionName ) {
+ super( graphManagerFactory, rxTaskScheduler, eventBuilder, asyncEventService );
this.connectionName = connectionName;
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/1b43bda3/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
index e4eb5fc..343cc77 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
@@ -157,26 +157,26 @@ public class NodeDeleteListenerImpl implements NodeDeleteListener {
//get all edges pointing to the target node and buffer then into groups for deletion
Observable<MarkedEdge> targetEdges =
- getEdgesTypesToTarget( scope, new SimpleSearchEdgeType( node, null, null ) )
- .subscribeOn( Schedulers.io() ).flatMap( edgeType -> Observable.create( new ObservableIterator<MarkedEdge>( "getTargetEdges" ) {
+ getEdgesTypesToTarget(scope, new SimpleSearchEdgeType(node, null, null))
+ .flatMap(edgeType -> Observable.create(new ObservableIterator<MarkedEdge>("getTargetEdges") {
@Override
protected Iterator<MarkedEdge> getIterator() {
- return storageSerialization.getEdgesToTarget( scope,
- new SimpleSearchByEdgeType( node, edgeType, maxVersion, SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent() ) );
+ return storageSerialization.getEdgesToTarget(scope,
+ new SimpleSearchByEdgeType(node, edgeType, maxVersion, SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent()));
}
- } ) );
+ }));
//get all edges pointing to the source node and buffer them into groups for deletion
Observable<MarkedEdge> sourceEdges =
- getEdgesTypesFromSource( scope, new SimpleSearchEdgeType( node, null, null ) )
- .subscribeOn( Schedulers.io() ).flatMap( edgeType -> Observable.create( new ObservableIterator<MarkedEdge>( "getSourceEdges" ) {
+ getEdgesTypesFromSource(scope, new SimpleSearchEdgeType(node, null, null))
+ .flatMap(edgeType -> Observable.create(new ObservableIterator<MarkedEdge>("getSourceEdges") {
@Override
protected Iterator<MarkedEdge> getIterator() {
- return storageSerialization.getEdgesFromSource( scope,
- new SimpleSearchByEdgeType( node, edgeType, maxVersion, SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent() ) );
+ return storageSerialization.getEdgesFromSource(scope,
+ new SimpleSearchByEdgeType(node, edgeType, maxVersion, SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent()));
}
- } ) );
+ }));
//merge both source and target into 1 observable. We'll need to check them all regardless of order
return Observable.merge( targetEdges, sourceEdges )
@@ -235,12 +235,7 @@ public class NodeDeleteListenerImpl implements NodeDeleteListener {
//run both the source/target edge type cleanup, then proceed
return Observable.merge( sourceMetaCleanup, targetMetaCleanup ).lastOrDefault( null )
- .flatMap( new Func1<Integer, Observable<MarkedEdge>>() {
- @Override
- public Observable<MarkedEdge> call( final Integer integer ) {
- return Observable.from( markedEdges );
- }
- } );
+ .flatMap(integer -> Observable.from( markedEdges ));
} );
}