You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/10/27 21:41:04 UTC
[1/4] usergrid git commit: Make the graph read repair directly
compact nodes in graph instead of queueing events. Misc prop file changes.
Repository: usergrid
Updated Branches:
refs/heads/2.1-release e018c1e0d -> 26860545e
Make the graph read repair directly compact nodes in graph instead of queueing events. Misc prop file changes.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/70d7a958
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/70d7a958
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/70d7a958
Branch: refs/heads/2.1-release
Commit: 70d7a9586ece0f32ec5aa50334cd4d70f440b2c6
Parents: 5eed978
Author: Michael Russo <mi...@gmail.com>
Authored: Mon Oct 26 22:26:07 2015 -0700
Committer: Michael Russo <mi...@gmail.com>
Committed: Mon Oct 26 22:26:07 2015 -0700
----------------------------------------------------------------------
.../index/IndexProcessorFig.java | 4 ++--
.../read/traverse/AbstractReadGraphFilter.java | 25 ++++++++++----------
.../traverse/ReadGraphCollectionFilter.java | 6 ++---
.../traverse/ReadGraphConnectionFilter.java | 6 ++---
4 files changed, 21 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/70d7a958/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
index ec9b315..7650c62 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
@@ -65,14 +65,14 @@ public interface IndexProcessorFig extends GuicyFig {
* Received messages will remain 'in flight' until they are ack'd(deleted) or this timeout occurs.
* If the timeout occurs, the messages will become visible again for re-processing.
*/
- @Default( "5000" ) // 5 seconds
+ @Default( "30000" ) // 30 seconds
@Key( INDEX_QUEUE_VISIBILITY_TIMEOUT )
int getIndexQueueVisibilityTimeout();
/**
* The number of worker threads used to read index write requests from the queue.
*/
- @Default( "8" )
+ @Default( "16" )
@Key( ELASTICSEARCH_WORKER_COUNT )
int getWorkerCount();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/70d7a958/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
index 621edd2..9d050c8 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
@@ -20,10 +20,10 @@
package org.apache.usergrid.corepersistence.pipeline.read.traverse;
+import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter;
import org.apache.usergrid.corepersistence.pipeline.read.EdgePath;
@@ -50,16 +50,16 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
private static final Logger logger = LoggerFactory.getLogger( AbstractReadGraphFilter.class );
private final GraphManagerFactory graphManagerFactory;
- private final AsyncEventService asyncEventService;
+ private final RxTaskScheduler rxTaskScheduler;
/**
* Create a new instance of our command
*/
public AbstractReadGraphFilter( final GraphManagerFactory graphManagerFactory,
- final AsyncEventService asyncEventService ) {
+ final RxTaskScheduler rxTaskScheduler) {
this.graphManagerFactory = graphManagerFactory;
- this.asyncEventService = asyncEventService;
+ this.rxTaskScheduler = rxTaskScheduler;
}
@@ -109,25 +109,26 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
if(isDeleted){
- logger.trace( "Edge {} is deleted, queueing the delete event", markedEdge );
- asyncEventService.queueDeleteEdge( applicationScope, markedEdge );
+ logger.trace("Edge {} is deleted, deleting the edge", markedEdge);
+ graphManager.deleteEdge(markedEdge).subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
+ .subscribe();
}
if(isSourceNodeDeleted){
final Id sourceNodeId = markedEdge.getSourceNode();
- logger.trace( "Edge {} has a deleted source node, queueing the delete entity event for id {}", markedEdge, sourceNodeId );
-
- asyncEventService.queueEntityDelete( applicationScope, sourceNodeId );
+ logger.trace("Edge {} has a deleted source node, deleting the entity for id {}", markedEdge, sourceNodeId);
+ graphManager.compactNode(sourceNodeId).subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
+ .subscribe();
}
if(isTargetNodeDelete){
final Id targetNodeId = markedEdge.getTargetNode();
- logger.trace( "Edge {} has a deleted target node, queueing the delete entity event for id {}", markedEdge, targetNodeId );
-
- asyncEventService.queueEntityDelete( applicationScope, targetNodeId );
+ logger.trace("Edge {} has a deleted target node, deleting the entity for id {}", markedEdge, targetNodeId );
+ graphManager.compactNode(targetNodeId).subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
+ .subscribe();
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/70d7a958/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java
index db5a0a8..1d63bc6 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java
@@ -20,7 +20,7 @@
package org.apache.usergrid.corepersistence.pipeline.read.traverse;
-import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
+import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
import org.apache.usergrid.persistence.graph.GraphManagerFactory;
import com.google.inject.Inject;
@@ -41,8 +41,8 @@ public class ReadGraphCollectionFilter extends AbstractReadGraphFilter {
* Create a new instance of our command
*/
@Inject
- public ReadGraphCollectionFilter( final GraphManagerFactory graphManagerFactory, final AsyncEventService asyncEventService, @Assisted final String collectionName ) {
- super( graphManagerFactory, asyncEventService );
+ public ReadGraphCollectionFilter( final GraphManagerFactory graphManagerFactory, final RxTaskScheduler rxTaskScheduler, @Assisted final String collectionName ) {
+ super( graphManagerFactory, rxTaskScheduler );
this.collectionName = collectionName;
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/70d7a958/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java
index 93e8fd4..efe94db 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java
@@ -20,7 +20,7 @@
package org.apache.usergrid.corepersistence.pipeline.read.traverse;
-import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
+import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
import org.apache.usergrid.persistence.graph.GraphManagerFactory;
import com.google.inject.Inject;
@@ -41,8 +41,8 @@ public class ReadGraphConnectionFilter extends AbstractReadGraphFilter {
* Create a new instance of our command
*/
@Inject
- public ReadGraphConnectionFilter( final GraphManagerFactory graphManagerFactory, final AsyncEventService asyncEventService, @Assisted final String connectionName ) {
- super( graphManagerFactory, asyncEventService );
+ public ReadGraphConnectionFilter( final GraphManagerFactory graphManagerFactory, final RxTaskScheduler rxTaskScheduler, @Assisted final String connectionName ) {
+ super( graphManagerFactory, rxTaskScheduler );
this.connectionName = connectionName;
}
[2/4] usergrid git commit: Change read repair to interact with c*
directly and only fire and index operation message to get the ES document
removed from all regions.
Posted by to...@apache.org.
Change read repair to interact with c* directly and only fire and index operation message to get the ES document removed from all regions.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/1b43bda3
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/1b43bda3
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/1b43bda3
Branch: refs/heads/2.1-release
Commit: 1b43bda3f801172b0e59f927ff3ae52a559d36cc
Parents: 70d7a95
Author: Michael Russo <mi...@gmail.com>
Authored: Tue Oct 27 10:50:16 2015 -0700
Committer: Michael Russo <mi...@gmail.com>
Committed: Tue Oct 27 12:56:13 2015 -0700
----------------------------------------------------------------------
.../asyncevents/AsyncEventService.java | 5 ++
.../asyncevents/InMemoryAsyncEventService.java | 5 ++
.../read/traverse/AbstractReadGraphFilter.java | 69 +++++++++++++++++---
.../traverse/ReadGraphCollectionFilter.java | 10 ++-
.../traverse/ReadGraphConnectionFilter.java | 10 ++-
.../impl/stage/NodeDeleteListenerImpl.java | 27 ++++----
6 files changed, 97 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/1b43bda3/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
index dcfffcb..dbf8996 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
@@ -77,6 +77,11 @@ public interface AsyncEventService extends ReIndexAction {
*/
void queueEntityDelete(final ApplicationScope applicationScope, final Id entityId);
+ /**
+ *
+ * @param indexOperationMessage
+ */
+ void queueIndexOperationMessage( final IndexOperationMessage indexOperationMessage );
/**
* current queue depth
http://git-wip-us.apache.org/repos/asf/usergrid/blob/1b43bda3/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
index fc6385c..d8334b3 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
@@ -105,6 +105,11 @@ public class InMemoryAsyncEventService implements AsyncEventService {
run( results.getCompactedNode() );
}
+ @Override
+ public void queueIndexOperationMessage(final IndexOperationMessage indexOperationMessage){
+ //this is not used locally
+ }
+
public void index( final ApplicationScope applicationScope, final Id id, final long updatedSince ) {
final EntityIndexOperation entityIndexOperation = new EntityIndexOperation( applicationScope, id, updatedSince );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/1b43bda3/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
index 9d050c8..89230d7 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
@@ -20,7 +20,11 @@
package org.apache.usergrid.corepersistence.pipeline.read.traverse;
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
+import org.apache.usergrid.corepersistence.asyncevents.EventBuilder;
+import org.apache.usergrid.corepersistence.asyncevents.EventBuilderImpl;
import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
+import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,6 +46,7 @@ import com.google.common.base.Optional;
import rx.Observable;
+
/**
* Command for reading graph edges
*/
@@ -51,15 +56,21 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
private final GraphManagerFactory graphManagerFactory;
private final RxTaskScheduler rxTaskScheduler;
+ private final EventBuilder eventBuilder;
+ private final AsyncEventService asyncEventService;
/**
* Create a new instance of our command
*/
public AbstractReadGraphFilter( final GraphManagerFactory graphManagerFactory,
- final RxTaskScheduler rxTaskScheduler) {
+ final RxTaskScheduler rxTaskScheduler,
+ final EventBuilder eventBuilder,
+ final AsyncEventService asyncEventService ) {
this.graphManagerFactory = graphManagerFactory;
this.rxTaskScheduler = rxTaskScheduler;
+ this.eventBuilder = eventBuilder;
+ this.asyncEventService = asyncEventService;
}
@@ -107,28 +118,56 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
final boolean isTargetNodeDelete = markedEdge.isTargetNodeDeleted();
+ if (isDeleted) {
- if(isDeleted){
logger.trace("Edge {} is deleted, deleting the edge", markedEdge);
- graphManager.deleteEdge(markedEdge).subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
+ final Observable<IndexOperationMessage> indexMessageObservable = eventBuilder.buildDeleteEdge(applicationScope, markedEdge);
+
+ indexMessageObservable
+ .compose(applyCollector())
+ .subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
.subscribe();
+
}
- if(isSourceNodeDeleted){
- final Id sourceNodeId = markedEdge.getSourceNode();
+ if (isSourceNodeDeleted) {
+ final Id sourceNodeId = markedEdge.getSourceNode();
logger.trace("Edge {} has a deleted source node, deleting the entity for id {}", markedEdge, sourceNodeId);
- graphManager.compactNode(sourceNodeId).subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
+
+ final EventBuilderImpl.EntityDeleteResults
+ entityDeleteResults = eventBuilder.buildEntityDelete(applicationScope, sourceNodeId);
+
+ entityDeleteResults.getIndexObservable()
+ .compose(applyCollector())
+ .subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
.subscribe();
+
+ Observable.merge(entityDeleteResults.getEntitiesDeleted(),
+ entityDeleteResults.getCompactedNode())
+ .subscribeOn(rxTaskScheduler.getAsyncIOScheduler()).
+ subscribe();
+
}
- if(isTargetNodeDelete){
+ if (isTargetNodeDelete) {
final Id targetNodeId = markedEdge.getTargetNode();
+ logger.trace("Edge {} has a deleted target node, deleting the entity for id {}", markedEdge, targetNodeId);
- logger.trace("Edge {} has a deleted target node, deleting the entity for id {}", markedEdge, targetNodeId );
- graphManager.compactNode(targetNodeId).subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
+ final EventBuilderImpl.EntityDeleteResults
+ entityDeleteResults = eventBuilder.buildEntityDelete(applicationScope, targetNodeId);
+
+ entityDeleteResults.getIndexObservable()
+ .compose(applyCollector())
+ .subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
.subscribe();
+
+ Observable.merge(entityDeleteResults.getEntitiesDeleted(),
+ entityDeleteResults.getCompactedNode())
+ .subscribeOn(rxTaskScheduler.getAsyncIOScheduler()).
+ subscribe();
+
}
@@ -202,4 +241,16 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
return cursorEdge;
}
}
+
+ private Observable.Transformer<IndexOperationMessage, IndexOperationMessage> applyCollector() {
+
+ return observable -> observable
+ .filter((IndexOperationMessage msg) -> !msg.isEmpty())
+ .collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single))
+ .doOnNext(indexOperation -> {
+ asyncEventService.queueIndexOperationMessage(indexOperation);
+ });
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/1b43bda3/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java
index 1d63bc6..3d7df3b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java
@@ -20,6 +20,8 @@
package org.apache.usergrid.corepersistence.pipeline.read.traverse;
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
+import org.apache.usergrid.corepersistence.asyncevents.EventBuilder;
import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
import org.apache.usergrid.persistence.graph.GraphManagerFactory;
@@ -41,8 +43,12 @@ public class ReadGraphCollectionFilter extends AbstractReadGraphFilter {
* Create a new instance of our command
*/
@Inject
- public ReadGraphCollectionFilter( final GraphManagerFactory graphManagerFactory, final RxTaskScheduler rxTaskScheduler, @Assisted final String collectionName ) {
- super( graphManagerFactory, rxTaskScheduler );
+ public ReadGraphCollectionFilter( final GraphManagerFactory graphManagerFactory,
+ final RxTaskScheduler rxTaskScheduler,
+ final EventBuilder eventBuilder,
+ final AsyncEventService asyncEventService,
+ @Assisted final String collectionName ) {
+ super( graphManagerFactory, rxTaskScheduler, eventBuilder, asyncEventService );
this.collectionName = collectionName;
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/1b43bda3/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java
index efe94db..b2d368b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java
@@ -20,6 +20,8 @@
package org.apache.usergrid.corepersistence.pipeline.read.traverse;
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
+import org.apache.usergrid.corepersistence.asyncevents.EventBuilder;
import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
import org.apache.usergrid.persistence.graph.GraphManagerFactory;
@@ -41,8 +43,12 @@ public class ReadGraphConnectionFilter extends AbstractReadGraphFilter {
* Create a new instance of our command
*/
@Inject
- public ReadGraphConnectionFilter( final GraphManagerFactory graphManagerFactory, final RxTaskScheduler rxTaskScheduler, @Assisted final String connectionName ) {
- super( graphManagerFactory, rxTaskScheduler );
+ public ReadGraphConnectionFilter( final GraphManagerFactory graphManagerFactory,
+ final RxTaskScheduler rxTaskScheduler,
+ final EventBuilder eventBuilder,
+ final AsyncEventService asyncEventService,
+ @Assisted final String connectionName ) {
+ super( graphManagerFactory, rxTaskScheduler, eventBuilder, asyncEventService );
this.connectionName = connectionName;
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/1b43bda3/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
index e4eb5fc..343cc77 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
@@ -157,26 +157,26 @@ public class NodeDeleteListenerImpl implements NodeDeleteListener {
//get all edges pointing to the target node and buffer then into groups for deletion
Observable<MarkedEdge> targetEdges =
- getEdgesTypesToTarget( scope, new SimpleSearchEdgeType( node, null, null ) )
- .subscribeOn( Schedulers.io() ).flatMap( edgeType -> Observable.create( new ObservableIterator<MarkedEdge>( "getTargetEdges" ) {
+ getEdgesTypesToTarget(scope, new SimpleSearchEdgeType(node, null, null))
+ .flatMap(edgeType -> Observable.create(new ObservableIterator<MarkedEdge>("getTargetEdges") {
@Override
protected Iterator<MarkedEdge> getIterator() {
- return storageSerialization.getEdgesToTarget( scope,
- new SimpleSearchByEdgeType( node, edgeType, maxVersion, SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent() ) );
+ return storageSerialization.getEdgesToTarget(scope,
+ new SimpleSearchByEdgeType(node, edgeType, maxVersion, SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent()));
}
- } ) );
+ }));
//get all edges pointing to the source node and buffer them into groups for deletion
Observable<MarkedEdge> sourceEdges =
- getEdgesTypesFromSource( scope, new SimpleSearchEdgeType( node, null, null ) )
- .subscribeOn( Schedulers.io() ).flatMap( edgeType -> Observable.create( new ObservableIterator<MarkedEdge>( "getSourceEdges" ) {
+ getEdgesTypesFromSource(scope, new SimpleSearchEdgeType(node, null, null))
+ .flatMap(edgeType -> Observable.create(new ObservableIterator<MarkedEdge>("getSourceEdges") {
@Override
protected Iterator<MarkedEdge> getIterator() {
- return storageSerialization.getEdgesFromSource( scope,
- new SimpleSearchByEdgeType( node, edgeType, maxVersion, SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent() ) );
+ return storageSerialization.getEdgesFromSource(scope,
+ new SimpleSearchByEdgeType(node, edgeType, maxVersion, SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent()));
}
- } ) );
+ }));
//merge both source and target into 1 observable. We'll need to check them all regardless of order
return Observable.merge( targetEdges, sourceEdges )
@@ -235,12 +235,7 @@ public class NodeDeleteListenerImpl implements NodeDeleteListener {
//run both the source/target edge type cleanup, then proceed
return Observable.merge( sourceMetaCleanup, targetMetaCleanup ).lastOrDefault( null )
- .flatMap( new Func1<Integer, Observable<MarkedEdge>>() {
- @Override
- public Observable<MarkedEdge> call( final Integer integer ) {
- return Observable.from( markedEdges );
- }
- } );
+ .flatMap(integer -> Observable.from( markedEdges ));
} );
}
[4/4] usergrid git commit: Merge branch 'delete-event-updates' into
2.1-release
Posted by to...@apache.org.
Merge branch 'delete-event-updates' into 2.1-release
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/26860545
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/26860545
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/26860545
Branch: refs/heads/2.1-release
Commit: 26860545ece19361569370bc63496a01bb48738d
Parents: e018c1e 76476f1
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Oct 27 14:40:23 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Oct 27 14:40:23 2015 -0600
----------------------------------------------------------------------
.../asyncevents/AmazonAsyncEventService.java | 68 ++++++++++--------
.../asyncevents/AsyncEventService.java | 5 ++
.../asyncevents/InMemoryAsyncEventService.java | 5 ++
.../index/IndexProcessorFig.java | 13 +++-
.../read/traverse/AbstractReadGraphFilter.java | 72 +++++++++++++++++---
.../traverse/ReadGraphCollectionFilter.java | 10 ++-
.../traverse/ReadGraphConnectionFilter.java | 10 ++-
.../impl/stage/NodeDeleteListenerImpl.java | 27 +++-----
8 files changed, 150 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
[3/4] usergrid git commit: Updates the message flow to allow for
multiple processor threads per SQS take thread
Posted by to...@apache.org.
Updates the message flow to allow for multiple processor threads per SQS take thread
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/76476f17
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/76476f17
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/76476f17
Branch: refs/heads/2.1-release
Commit: 76476f17cf8e8be6f01660db3d21110eda8247f5
Parents: 1b43bda
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Oct 27 14:35:34 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Oct 27 14:35:34 2015 -0600
----------------------------------------------------------------------
.../asyncevents/AmazonAsyncEventService.java | 68 +++++++++++---------
.../index/IndexProcessorFig.java | 13 +++-
2 files changed, 51 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/76476f17/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index d93e304..6b9abbc 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -679,35 +679,45 @@ public class AmazonAsyncEventService implements AsyncEventService {
}
while ( true );
}
- } )
- //this won't block our read loop, just reads and proceeds
- .map( messages -> {
- if ( messages == null || messages.size() == 0 ) {
- return null;
- }
-
- try {
- List<IndexEventResult> indexEventResults = callEventHandlers( messages );
- List<QueueMessage> messagesToAck = submitToIndex( indexEventResults );
- if ( messagesToAck == null || messagesToAck.size() == 0 ) {
- logger.error( "No messages came back from the queue operation should have seen "
- + messages.size(), messages );
- return messagesToAck;
- }
- if ( messagesToAck.size() < messages.size() ) {
- logger.error( "Missing messages from queue post operation", messages,
- messagesToAck );
- }
- //ack each message, but only if we didn't error.
- ack( messagesToAck );
- return messagesToAck;
- }
- catch ( Exception e ) {
- logger.error( "failed to ack messages to sqs", e );
- return null;
- //do not rethrow so we can process all of them
- }
- } );
+ } ) //this won't block our read loop, just reads and proceeds
+ .flatMap( sqsMessages -> {
+
+ //do this on a different schedule, and introduce concurrency with flatmap for faster processing
+ return Observable.just( sqsMessages )
+
+ .map( messages -> {
+ if ( messages == null || messages.size() == 0 ) {
+ return null;
+ }
+
+ try {
+ List<IndexEventResult> indexEventResults =
+ callEventHandlers( messages );
+ List<QueueMessage> messagesToAck =
+ submitToIndex( indexEventResults );
+ if ( messagesToAck == null || messagesToAck.size() == 0 ) {
+ logger.error(
+ "No messages came back from the queue operation should "
+ + "have seen "
+ + messages.size(), messages );
+ return messagesToAck;
+ }
+ if ( messagesToAck.size() < messages.size() ) {
+ logger.error( "Missing messages from queue post operation",
+ messages, messagesToAck );
+ }
+ //ack each message, but only if we didn't error.
+ ack( messagesToAck );
+ return messagesToAck;
+ }
+ catch ( Exception e ) {
+ logger.error( "failed to ack messages to sqs", e );
+ return null;
+ //do not rethrow so we can process all of them
+ }
+ } ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() );
+ //end flatMap
+ }, indexProcessorFig.getEventConcurrencyFactor() );
//start in the background
http://git-wip-us.apache.org/repos/asf/usergrid/blob/76476f17/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
index 7650c62..9d02717 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
@@ -36,6 +36,8 @@ public interface IndexProcessorFig extends GuicyFig {
String ELASTICSEARCH_WORKER_COUNT = "elasticsearch.worker_count";
+ String EVENT_CONCURRENCY_FACTOR = "event.concurrency.factor";
+
String ELASTICSEARCH_QUEUE_IMPL = "elasticsearch.queue_impl";
String INDEX_QUEUE_READ_TIMEOUT = "elasticsearch.queue_read_timeout";
@@ -70,9 +72,18 @@ public interface IndexProcessorFig extends GuicyFig {
int getIndexQueueVisibilityTimeout();
/**
+ * The number of worker threads used when handing off messages from the SQS thread
+ */
+ @Default( "20" )
+ @Key( EVENT_CONCURRENCY_FACTOR )
+ int getEventConcurrencyFactor();
+
+
+
+ /**
* The number of worker threads used to read index write requests from the queue.
*/
- @Default( "16" )
+ @Default( "8" )
@Key( ELASTICSEARCH_WORKER_COUNT )
int getWorkerCount();