You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/11/02 23:55:55 UTC

[04/50] [abbrv] usergrid git commit: Make the graph read repair directly compact nodes in graph instead of queueing events. Misc prop file changes.

Make the graph read repair directly compact nodes in graph instead of queueing events.  Misc prop file changes.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/70d7a958
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/70d7a958
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/70d7a958

Branch: refs/heads/USERGRID-909
Commit: 70d7a9586ece0f32ec5aa50334cd4d70f440b2c6
Parents: 5eed978
Author: Michael Russo <mi...@gmail.com>
Authored: Mon Oct 26 22:26:07 2015 -0700
Committer: Michael Russo <mi...@gmail.com>
Committed: Mon Oct 26 22:26:07 2015 -0700

----------------------------------------------------------------------
 .../index/IndexProcessorFig.java                |  4 ++--
 .../read/traverse/AbstractReadGraphFilter.java  | 25 ++++++++++----------
 .../traverse/ReadGraphCollectionFilter.java     |  6 ++---
 .../traverse/ReadGraphConnectionFilter.java     |  6 ++---
 4 files changed, 21 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/70d7a958/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
index ec9b315..7650c62 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
@@ -65,14 +65,14 @@ public interface IndexProcessorFig extends GuicyFig {
      * Received messages will remain 'in flight' until they are ack'd(deleted) or this timeout occurs.
      * If the timeout occurs, the messages will become visible again for re-processing.
      */
-    @Default( "5000" ) // 5 seconds
+    @Default( "30000" ) // 30 seconds
     @Key( INDEX_QUEUE_VISIBILITY_TIMEOUT )
     int getIndexQueueVisibilityTimeout();
 
     /**
      * The number of worker threads used to read index write requests from the queue.
      */
-    @Default( "8" )
+    @Default( "16" )
     @Key( ELASTICSEARCH_WORKER_COUNT )
     int getWorkerCount();
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/70d7a958/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
index 621edd2..9d050c8 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
@@ -20,10 +20,10 @@
 package org.apache.usergrid.corepersistence.pipeline.read.traverse;
 
 
+import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
 import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
 import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter;
 import org.apache.usergrid.corepersistence.pipeline.read.EdgePath;
@@ -50,16 +50,16 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
     private static final Logger logger = LoggerFactory.getLogger( AbstractReadGraphFilter.class );
 
     private final GraphManagerFactory graphManagerFactory;
-    private final AsyncEventService asyncEventService;
+    private final RxTaskScheduler rxTaskScheduler;
 
 
     /**
      * Create a new instance of our command
      */
     public AbstractReadGraphFilter( final GraphManagerFactory graphManagerFactory,
-                                    final AsyncEventService asyncEventService ) {
+                                    final RxTaskScheduler rxTaskScheduler) {
         this.graphManagerFactory = graphManagerFactory;
-        this.asyncEventService = asyncEventService;
+        this.rxTaskScheduler = rxTaskScheduler;
     }
 
 
@@ -109,25 +109,26 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
 
 
                 if(isDeleted){
-                    logger.trace( "Edge {} is deleted, queueing the delete event", markedEdge );
-                    asyncEventService.queueDeleteEdge( applicationScope, markedEdge  );
+                    logger.trace("Edge {} is deleted, deleting the edge", markedEdge);
+                    graphManager.deleteEdge(markedEdge).subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
+                        .subscribe();
                 }
 
                 if(isSourceNodeDeleted){
                     final Id sourceNodeId = markedEdge.getSourceNode();
 
-                    logger.trace( "Edge {} has a deleted source node, queueing the delete entity event for id {}", markedEdge, sourceNodeId );
-
-                    asyncEventService.queueEntityDelete( applicationScope, sourceNodeId );
+                    logger.trace("Edge {} has a deleted source node, deleting the entity for id {}", markedEdge, sourceNodeId);
+                    graphManager.compactNode(sourceNodeId).subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
+                        .subscribe();
                 }
 
                 if(isTargetNodeDelete){
 
                     final Id targetNodeId = markedEdge.getTargetNode();
 
-                    logger.trace( "Edge {} has a deleted target node, queueing the delete entity event for id {}", markedEdge, targetNodeId );
-
-                    asyncEventService.queueEntityDelete( applicationScope, targetNodeId );
+                    logger.trace("Edge {} has a deleted target node, deleting the entity for id {}", markedEdge, targetNodeId );
+                    graphManager.compactNode(targetNodeId).subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
+                        .subscribe();
                 }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/70d7a958/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java
index db5a0a8..1d63bc6 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java
@@ -20,7 +20,7 @@
 package org.apache.usergrid.corepersistence.pipeline.read.traverse;
 
 
-import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
+import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
 
 import com.google.inject.Inject;
@@ -41,8 +41,8 @@ public class ReadGraphCollectionFilter extends AbstractReadGraphFilter {
      * Create a new instance of our command
      */
     @Inject
-    public ReadGraphCollectionFilter( final GraphManagerFactory graphManagerFactory, final AsyncEventService asyncEventService, @Assisted final String collectionName ) {
-        super( graphManagerFactory, asyncEventService );
+    public ReadGraphCollectionFilter( final GraphManagerFactory graphManagerFactory, final RxTaskScheduler rxTaskScheduler, @Assisted final String collectionName ) {
+        super( graphManagerFactory, rxTaskScheduler );
         this.collectionName = collectionName;
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/70d7a958/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java
index 93e8fd4..efe94db 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java
@@ -20,7 +20,7 @@
 package org.apache.usergrid.corepersistence.pipeline.read.traverse;
 
 
-import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
+import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
 
 import com.google.inject.Inject;
@@ -41,8 +41,8 @@ public class ReadGraphConnectionFilter extends AbstractReadGraphFilter {
      * Create a new instance of our command
      */
     @Inject
-    public ReadGraphConnectionFilter( final GraphManagerFactory graphManagerFactory,  final AsyncEventService asyncEventService,  @Assisted final String connectionName ) {
-        super( graphManagerFactory, asyncEventService );
+    public ReadGraphConnectionFilter( final GraphManagerFactory graphManagerFactory, final RxTaskScheduler rxTaskScheduler, @Assisted final String connectionName ) {
+        super( graphManagerFactory, rxTaskScheduler );
         this.connectionName = connectionName;
     }