You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2015/10/27 06:26:12 UTC

usergrid git commit: Make the graph read repair directly compact nodes in graph instead of queueing events. Misc prop file changes.

Repository: usergrid
Updated Branches:
  refs/heads/delete-event-updates [created] 70d7a9586


Make the graph read repair directly compact nodes in graph instead of queueing events.  Misc prop file changes.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/70d7a958
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/70d7a958
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/70d7a958

Branch: refs/heads/delete-event-updates
Commit: 70d7a9586ece0f32ec5aa50334cd4d70f440b2c6
Parents: 5eed978
Author: Michael Russo <mi...@gmail.com>
Authored: Mon Oct 26 22:26:07 2015 -0700
Committer: Michael Russo <mi...@gmail.com>
Committed: Mon Oct 26 22:26:07 2015 -0700

----------------------------------------------------------------------
 .../index/IndexProcessorFig.java                |  4 ++--
 .../read/traverse/AbstractReadGraphFilter.java  | 25 ++++++++++----------
 .../traverse/ReadGraphCollectionFilter.java     |  6 ++---
 .../traverse/ReadGraphConnectionFilter.java     |  6 ++---
 4 files changed, 21 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/70d7a958/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
index ec9b315..7650c62 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
@@ -65,14 +65,14 @@ public interface IndexProcessorFig extends GuicyFig {
      * Received messages will remain 'in flight' until they are ack'd(deleted) or this timeout occurs.
      * If the timeout occurs, the messages will become visible again for re-processing.
      */
-    @Default( "5000" ) // 5 seconds
+    @Default( "30000" ) // 30 seconds
     @Key( INDEX_QUEUE_VISIBILITY_TIMEOUT )
     int getIndexQueueVisibilityTimeout();
 
     /**
      * The number of worker threads used to read index write requests from the queue.
      */
-    @Default( "8" )
+    @Default( "16" )
     @Key( ELASTICSEARCH_WORKER_COUNT )
     int getWorkerCount();
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/70d7a958/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
index 621edd2..9d050c8 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
@@ -20,10 +20,10 @@
 package org.apache.usergrid.corepersistence.pipeline.read.traverse;
 
 
+import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
 import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
 import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter;
 import org.apache.usergrid.corepersistence.pipeline.read.EdgePath;
@@ -50,16 +50,16 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
     private static final Logger logger = LoggerFactory.getLogger( AbstractReadGraphFilter.class );
 
     private final GraphManagerFactory graphManagerFactory;
-    private final AsyncEventService asyncEventService;
+    private final RxTaskScheduler rxTaskScheduler;
 
 
     /**
      * Create a new instance of our command
      */
     public AbstractReadGraphFilter( final GraphManagerFactory graphManagerFactory,
-                                    final AsyncEventService asyncEventService ) {
+                                    final RxTaskScheduler rxTaskScheduler) {
         this.graphManagerFactory = graphManagerFactory;
-        this.asyncEventService = asyncEventService;
+        this.rxTaskScheduler = rxTaskScheduler;
     }
 
 
@@ -109,25 +109,26 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
 
 
                 if(isDeleted){
-                    logger.trace( "Edge {} is deleted, queueing the delete event", markedEdge );
-                    asyncEventService.queueDeleteEdge( applicationScope, markedEdge  );
+                    logger.trace("Edge {} is deleted, deleting the edge", markedEdge);
+                    graphManager.deleteEdge(markedEdge).subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
+                        .subscribe();
                 }
 
                 if(isSourceNodeDeleted){
                     final Id sourceNodeId = markedEdge.getSourceNode();
 
-                    logger.trace( "Edge {} has a deleted source node, queueing the delete entity event for id {}", markedEdge, sourceNodeId );
-
-                    asyncEventService.queueEntityDelete( applicationScope, sourceNodeId );
+                    logger.trace("Edge {} has a deleted source node, deleting the entity for id {}", markedEdge, sourceNodeId);
+                    graphManager.compactNode(sourceNodeId).subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
+                        .subscribe();
                 }
 
                 if(isTargetNodeDelete){
 
                     final Id targetNodeId = markedEdge.getTargetNode();
 
-                    logger.trace( "Edge {} has a deleted target node, queueing the delete entity event for id {}", markedEdge, targetNodeId );
-
-                    asyncEventService.queueEntityDelete( applicationScope, targetNodeId );
+                    logger.trace("Edge {} has a deleted target node, deleting the entity for id {}", markedEdge, targetNodeId );
+                    graphManager.compactNode(targetNodeId).subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
+                        .subscribe();
                 }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/70d7a958/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java
index db5a0a8..1d63bc6 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java
@@ -20,7 +20,7 @@
 package org.apache.usergrid.corepersistence.pipeline.read.traverse;
 
 
-import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
+import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
 
 import com.google.inject.Inject;
@@ -41,8 +41,8 @@ public class ReadGraphCollectionFilter extends AbstractReadGraphFilter {
      * Create a new instance of our command
      */
     @Inject
-    public ReadGraphCollectionFilter( final GraphManagerFactory graphManagerFactory, final AsyncEventService asyncEventService, @Assisted final String collectionName ) {
-        super( graphManagerFactory, asyncEventService );
+    public ReadGraphCollectionFilter( final GraphManagerFactory graphManagerFactory, final RxTaskScheduler rxTaskScheduler, @Assisted final String collectionName ) {
+        super( graphManagerFactory, rxTaskScheduler );
         this.collectionName = collectionName;
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/70d7a958/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java
index 93e8fd4..efe94db 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java
@@ -20,7 +20,7 @@
 package org.apache.usergrid.corepersistence.pipeline.read.traverse;
 
 
-import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
+import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
 
 import com.google.inject.Inject;
@@ -41,8 +41,8 @@ public class ReadGraphConnectionFilter extends AbstractReadGraphFilter {
      * Create a new instance of our command
      */
     @Inject
-    public ReadGraphConnectionFilter( final GraphManagerFactory graphManagerFactory,  final AsyncEventService asyncEventService,  @Assisted final String connectionName ) {
-        super( graphManagerFactory, asyncEventService );
+    public ReadGraphConnectionFilter( final GraphManagerFactory graphManagerFactory, final RxTaskScheduler rxTaskScheduler, @Assisted final String connectionName ) {
+        super( graphManagerFactory, rxTaskScheduler );
         this.connectionName = connectionName;
     }