You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2015/10/27 06:26:12 UTC
usergrid git commit: Make the graph read repair directly compact
nodes in graph instead of queueing events. Misc prop file changes.
Repository: usergrid
Updated Branches:
refs/heads/delete-event-updates [created] 70d7a9586
Make the graph read repair directly compact nodes in graph instead of queueing events. Misc prop file changes.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/70d7a958
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/70d7a958
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/70d7a958
Branch: refs/heads/delete-event-updates
Commit: 70d7a9586ece0f32ec5aa50334cd4d70f440b2c6
Parents: 5eed978
Author: Michael Russo <mi...@gmail.com>
Authored: Mon Oct 26 22:26:07 2015 -0700
Committer: Michael Russo <mi...@gmail.com>
Committed: Mon Oct 26 22:26:07 2015 -0700
----------------------------------------------------------------------
.../index/IndexProcessorFig.java | 4 ++--
.../read/traverse/AbstractReadGraphFilter.java | 25 ++++++++++----------
.../traverse/ReadGraphCollectionFilter.java | 6 ++---
.../traverse/ReadGraphConnectionFilter.java | 6 ++---
4 files changed, 21 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/70d7a958/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
index ec9b315..7650c62 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
@@ -65,14 +65,14 @@ public interface IndexProcessorFig extends GuicyFig {
* Received messages will remain 'in flight' until they are ack'd(deleted) or this timeout occurs.
* If the timeout occurs, the messages will become visible again for re-processing.
*/
- @Default( "5000" ) // 5 seconds
+ @Default( "30000" ) // 30 seconds
@Key( INDEX_QUEUE_VISIBILITY_TIMEOUT )
int getIndexQueueVisibilityTimeout();
/**
* The number of worker threads used to read index write requests from the queue.
*/
- @Default( "8" )
+ @Default( "16" )
@Key( ELASTICSEARCH_WORKER_COUNT )
int getWorkerCount();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/70d7a958/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
index 621edd2..9d050c8 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
@@ -20,10 +20,10 @@
package org.apache.usergrid.corepersistence.pipeline.read.traverse;
+import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter;
import org.apache.usergrid.corepersistence.pipeline.read.EdgePath;
@@ -50,16 +50,16 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
private static final Logger logger = LoggerFactory.getLogger( AbstractReadGraphFilter.class );
private final GraphManagerFactory graphManagerFactory;
- private final AsyncEventService asyncEventService;
+ private final RxTaskScheduler rxTaskScheduler;
/**
* Create a new instance of our command
*/
public AbstractReadGraphFilter( final GraphManagerFactory graphManagerFactory,
- final AsyncEventService asyncEventService ) {
+ final RxTaskScheduler rxTaskScheduler) {
this.graphManagerFactory = graphManagerFactory;
- this.asyncEventService = asyncEventService;
+ this.rxTaskScheduler = rxTaskScheduler;
}
@@ -109,25 +109,26 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
if(isDeleted){
- logger.trace( "Edge {} is deleted, queueing the delete event", markedEdge );
- asyncEventService.queueDeleteEdge( applicationScope, markedEdge );
+ logger.trace("Edge {} is deleted, deleting the edge", markedEdge);
+ graphManager.deleteEdge(markedEdge).subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
+ .subscribe();
}
if(isSourceNodeDeleted){
final Id sourceNodeId = markedEdge.getSourceNode();
- logger.trace( "Edge {} has a deleted source node, queueing the delete entity event for id {}", markedEdge, sourceNodeId );
-
- asyncEventService.queueEntityDelete( applicationScope, sourceNodeId );
+ logger.trace("Edge {} has a deleted source node, deleting the entity for id {}", markedEdge, sourceNodeId);
+ graphManager.compactNode(sourceNodeId).subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
+ .subscribe();
}
if(isTargetNodeDelete){
final Id targetNodeId = markedEdge.getTargetNode();
- logger.trace( "Edge {} has a deleted target node, queueing the delete entity event for id {}", markedEdge, targetNodeId );
-
- asyncEventService.queueEntityDelete( applicationScope, targetNodeId );
+ logger.trace("Edge {} has a deleted target node, deleting the entity for id {}", markedEdge, targetNodeId );
+ graphManager.compactNode(targetNodeId).subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
+ .subscribe();
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/70d7a958/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java
index db5a0a8..1d63bc6 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java
@@ -20,7 +20,7 @@
package org.apache.usergrid.corepersistence.pipeline.read.traverse;
-import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
+import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
import org.apache.usergrid.persistence.graph.GraphManagerFactory;
import com.google.inject.Inject;
@@ -41,8 +41,8 @@ public class ReadGraphCollectionFilter extends AbstractReadGraphFilter {
* Create a new instance of our command
*/
@Inject
- public ReadGraphCollectionFilter( final GraphManagerFactory graphManagerFactory, final AsyncEventService asyncEventService, @Assisted final String collectionName ) {
- super( graphManagerFactory, asyncEventService );
+ public ReadGraphCollectionFilter( final GraphManagerFactory graphManagerFactory, final RxTaskScheduler rxTaskScheduler, @Assisted final String collectionName ) {
+ super( graphManagerFactory, rxTaskScheduler );
this.collectionName = collectionName;
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/70d7a958/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java
index 93e8fd4..efe94db 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java
@@ -20,7 +20,7 @@
package org.apache.usergrid.corepersistence.pipeline.read.traverse;
-import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
+import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
import org.apache.usergrid.persistence.graph.GraphManagerFactory;
import com.google.inject.Inject;
@@ -41,8 +41,8 @@ public class ReadGraphConnectionFilter extends AbstractReadGraphFilter {
* Create a new instance of our command
*/
@Inject
- public ReadGraphConnectionFilter( final GraphManagerFactory graphManagerFactory, final AsyncEventService asyncEventService, @Assisted final String connectionName ) {
- super( graphManagerFactory, asyncEventService );
+ public ReadGraphConnectionFilter( final GraphManagerFactory graphManagerFactory, final RxTaskScheduler rxTaskScheduler, @Assisted final String connectionName ) {
+ super( graphManagerFactory, rxTaskScheduler );
this.connectionName = connectionName;
}