You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/10/27 21:41:04 UTC

[1/4] usergrid git commit: Make the graph read repair directly compact nodes in graph instead of queueing events. Misc prop file changes.

Repository: usergrid
Updated Branches:
  refs/heads/2.1-release e018c1e0d -> 26860545e


Make the graph read repair directly compact nodes in graph instead of queueing events.  Misc prop file changes.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/70d7a958
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/70d7a958
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/70d7a958

Branch: refs/heads/2.1-release
Commit: 70d7a9586ece0f32ec5aa50334cd4d70f440b2c6
Parents: 5eed978
Author: Michael Russo <mi...@gmail.com>
Authored: Mon Oct 26 22:26:07 2015 -0700
Committer: Michael Russo <mi...@gmail.com>
Committed: Mon Oct 26 22:26:07 2015 -0700

----------------------------------------------------------------------
 .../index/IndexProcessorFig.java                |  4 ++--
 .../read/traverse/AbstractReadGraphFilter.java  | 25 ++++++++++----------
 .../traverse/ReadGraphCollectionFilter.java     |  6 ++---
 .../traverse/ReadGraphConnectionFilter.java     |  6 ++---
 4 files changed, 21 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/70d7a958/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
index ec9b315..7650c62 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
@@ -65,14 +65,14 @@ public interface IndexProcessorFig extends GuicyFig {
      * Received messages will remain 'in flight' until they are ack'd(deleted) or this timeout occurs.
      * If the timeout occurs, the messages will become visible again for re-processing.
      */
-    @Default( "5000" ) // 5 seconds
+    @Default( "30000" ) // 30 seconds
     @Key( INDEX_QUEUE_VISIBILITY_TIMEOUT )
     int getIndexQueueVisibilityTimeout();
 
     /**
      * The number of worker threads used to read index write requests from the queue.
      */
-    @Default( "8" )
+    @Default( "16" )
     @Key( ELASTICSEARCH_WORKER_COUNT )
     int getWorkerCount();
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/70d7a958/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
index 621edd2..9d050c8 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
@@ -20,10 +20,10 @@
 package org.apache.usergrid.corepersistence.pipeline.read.traverse;
 
 
+import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
 import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
 import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter;
 import org.apache.usergrid.corepersistence.pipeline.read.EdgePath;
@@ -50,16 +50,16 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
     private static final Logger logger = LoggerFactory.getLogger( AbstractReadGraphFilter.class );
 
     private final GraphManagerFactory graphManagerFactory;
-    private final AsyncEventService asyncEventService;
+    private final RxTaskScheduler rxTaskScheduler;
 
 
     /**
      * Create a new instance of our command
      */
     public AbstractReadGraphFilter( final GraphManagerFactory graphManagerFactory,
-                                    final AsyncEventService asyncEventService ) {
+                                    final RxTaskScheduler rxTaskScheduler) {
         this.graphManagerFactory = graphManagerFactory;
-        this.asyncEventService = asyncEventService;
+        this.rxTaskScheduler = rxTaskScheduler;
     }
 
 
@@ -109,25 +109,26 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
 
 
                 if(isDeleted){
-                    logger.trace( "Edge {} is deleted, queueing the delete event", markedEdge );
-                    asyncEventService.queueDeleteEdge( applicationScope, markedEdge  );
+                    logger.trace("Edge {} is deleted, deleting the edge", markedEdge);
+                    graphManager.deleteEdge(markedEdge).subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
+                        .subscribe();
                 }
 
                 if(isSourceNodeDeleted){
                     final Id sourceNodeId = markedEdge.getSourceNode();
 
-                    logger.trace( "Edge {} has a deleted source node, queueing the delete entity event for id {}", markedEdge, sourceNodeId );
-
-                    asyncEventService.queueEntityDelete( applicationScope, sourceNodeId );
+                    logger.trace("Edge {} has a deleted source node, deleting the entity for id {}", markedEdge, sourceNodeId);
+                    graphManager.compactNode(sourceNodeId).subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
+                        .subscribe();
                 }
 
                 if(isTargetNodeDelete){
 
                     final Id targetNodeId = markedEdge.getTargetNode();
 
-                    logger.trace( "Edge {} has a deleted target node, queueing the delete entity event for id {}", markedEdge, targetNodeId );
-
-                    asyncEventService.queueEntityDelete( applicationScope, targetNodeId );
+                    logger.trace("Edge {} has a deleted target node, deleting the entity for id {}", markedEdge, targetNodeId );
+                    graphManager.compactNode(targetNodeId).subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
+                        .subscribe();
                 }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/70d7a958/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java
index db5a0a8..1d63bc6 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java
@@ -20,7 +20,7 @@
 package org.apache.usergrid.corepersistence.pipeline.read.traverse;
 
 
-import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
+import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
 
 import com.google.inject.Inject;
@@ -41,8 +41,8 @@ public class ReadGraphCollectionFilter extends AbstractReadGraphFilter {
      * Create a new instance of our command
      */
     @Inject
-    public ReadGraphCollectionFilter( final GraphManagerFactory graphManagerFactory, final AsyncEventService asyncEventService, @Assisted final String collectionName ) {
-        super( graphManagerFactory, asyncEventService );
+    public ReadGraphCollectionFilter( final GraphManagerFactory graphManagerFactory, final RxTaskScheduler rxTaskScheduler, @Assisted final String collectionName ) {
+        super( graphManagerFactory, rxTaskScheduler );
         this.collectionName = collectionName;
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/70d7a958/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java
index 93e8fd4..efe94db 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java
@@ -20,7 +20,7 @@
 package org.apache.usergrid.corepersistence.pipeline.read.traverse;
 
 
-import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
+import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
 
 import com.google.inject.Inject;
@@ -41,8 +41,8 @@ public class ReadGraphConnectionFilter extends AbstractReadGraphFilter {
      * Create a new instance of our command
      */
     @Inject
-    public ReadGraphConnectionFilter( final GraphManagerFactory graphManagerFactory,  final AsyncEventService asyncEventService,  @Assisted final String connectionName ) {
-        super( graphManagerFactory, asyncEventService );
+    public ReadGraphConnectionFilter( final GraphManagerFactory graphManagerFactory, final RxTaskScheduler rxTaskScheduler, @Assisted final String connectionName ) {
+        super( graphManagerFactory, rxTaskScheduler );
         this.connectionName = connectionName;
     }
 


[2/4] usergrid git commit: Change read repair to interact with c* directly and only fire and index operation message to get the ES document removed from all regions.

Posted by to...@apache.org.
Change read repair to interact with c* directly and only fire and index operation message to get the ES document removed from all regions.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/1b43bda3
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/1b43bda3
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/1b43bda3

Branch: refs/heads/2.1-release
Commit: 1b43bda3f801172b0e59f927ff3ae52a559d36cc
Parents: 70d7a95
Author: Michael Russo <mi...@gmail.com>
Authored: Tue Oct 27 10:50:16 2015 -0700
Committer: Michael Russo <mi...@gmail.com>
Committed: Tue Oct 27 12:56:13 2015 -0700

----------------------------------------------------------------------
 .../asyncevents/AsyncEventService.java          |  5 ++
 .../asyncevents/InMemoryAsyncEventService.java  |  5 ++
 .../read/traverse/AbstractReadGraphFilter.java  | 69 +++++++++++++++++---
 .../traverse/ReadGraphCollectionFilter.java     | 10 ++-
 .../traverse/ReadGraphConnectionFilter.java     | 10 ++-
 .../impl/stage/NodeDeleteListenerImpl.java      | 27 ++++----
 6 files changed, 97 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/1b43bda3/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
index dcfffcb..dbf8996 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
@@ -77,6 +77,11 @@ public interface AsyncEventService extends ReIndexAction {
      */
     void queueEntityDelete(final ApplicationScope applicationScope, final Id entityId);
 
+    /**
+     *
+     * @param indexOperationMessage
+     */
+    void queueIndexOperationMessage( final IndexOperationMessage indexOperationMessage );
 
     /**
      * current queue depth

http://git-wip-us.apache.org/repos/asf/usergrid/blob/1b43bda3/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
index fc6385c..d8334b3 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
@@ -105,6 +105,11 @@ public class InMemoryAsyncEventService implements AsyncEventService {
         run( results.getCompactedNode() );
     }
 
+    @Override
+    public void queueIndexOperationMessage(final IndexOperationMessage indexOperationMessage){
+        //this is not used locally
+    }
+
 
     public void index( final ApplicationScope applicationScope, final Id id, final long updatedSince ) {
         final EntityIndexOperation entityIndexOperation = new EntityIndexOperation( applicationScope, id, updatedSince );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/1b43bda3/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
index 9d050c8..89230d7 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
@@ -20,7 +20,11 @@
 package org.apache.usergrid.corepersistence.pipeline.read.traverse;
 
 
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
+import org.apache.usergrid.corepersistence.asyncevents.EventBuilder;
+import org.apache.usergrid.corepersistence.asyncevents.EventBuilderImpl;
 import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
+import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,6 +46,7 @@ import com.google.common.base.Optional;
 import rx.Observable;
 
 
+
 /**
  * Command for reading graph edges
  */
@@ -51,15 +56,21 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
 
     private final GraphManagerFactory graphManagerFactory;
     private final RxTaskScheduler rxTaskScheduler;
+    private final EventBuilder eventBuilder;
+    private final AsyncEventService asyncEventService;
 
 
     /**
      * Create a new instance of our command
      */
     public AbstractReadGraphFilter( final GraphManagerFactory graphManagerFactory,
-                                    final RxTaskScheduler rxTaskScheduler) {
+                                    final RxTaskScheduler rxTaskScheduler,
+                                    final EventBuilder eventBuilder,
+                                    final AsyncEventService asyncEventService ) {
         this.graphManagerFactory = graphManagerFactory;
         this.rxTaskScheduler = rxTaskScheduler;
+        this.eventBuilder = eventBuilder;
+        this.asyncEventService = asyncEventService;
     }
 
 
@@ -107,28 +118,56 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
                 final boolean isTargetNodeDelete = markedEdge.isTargetNodeDeleted();
 
 
+                if (isDeleted) {
 
-                if(isDeleted){
                     logger.trace("Edge {} is deleted, deleting the edge", markedEdge);
-                    graphManager.deleteEdge(markedEdge).subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
+                    final Observable<IndexOperationMessage> indexMessageObservable = eventBuilder.buildDeleteEdge(applicationScope, markedEdge);
+
+                    indexMessageObservable
+                        .compose(applyCollector())
+                        .subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
                         .subscribe();
+
                 }
 
-                if(isSourceNodeDeleted){
-                    final Id sourceNodeId = markedEdge.getSourceNode();
+                if (isSourceNodeDeleted) {
 
+                    final Id sourceNodeId = markedEdge.getSourceNode();
                     logger.trace("Edge {} has a deleted source node, deleting the entity for id {}", markedEdge, sourceNodeId);
-                    graphManager.compactNode(sourceNodeId).subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
+
+                    final EventBuilderImpl.EntityDeleteResults
+                        entityDeleteResults = eventBuilder.buildEntityDelete(applicationScope, sourceNodeId);
+
+                    entityDeleteResults.getIndexObservable()
+                        .compose(applyCollector())
+                        .subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
                         .subscribe();
+
+                    Observable.merge(entityDeleteResults.getEntitiesDeleted(),
+                        entityDeleteResults.getCompactedNode())
+                        .subscribeOn(rxTaskScheduler.getAsyncIOScheduler()).
+                        subscribe();
+
                 }
 
-                if(isTargetNodeDelete){
+                if (isTargetNodeDelete) {
 
                     final Id targetNodeId = markedEdge.getTargetNode();
+                    logger.trace("Edge {} has a deleted target node, deleting the entity for id {}", markedEdge, targetNodeId);
 
-                    logger.trace("Edge {} has a deleted target node, deleting the entity for id {}", markedEdge, targetNodeId );
-                    graphManager.compactNode(targetNodeId).subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
+                    final EventBuilderImpl.EntityDeleteResults
+                        entityDeleteResults = eventBuilder.buildEntityDelete(applicationScope, targetNodeId);
+
+                    entityDeleteResults.getIndexObservable()
+                        .compose(applyCollector())
+                        .subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
                         .subscribe();
+
+                    Observable.merge(entityDeleteResults.getEntitiesDeleted(),
+                        entityDeleteResults.getCompactedNode())
+                        .subscribeOn(rxTaskScheduler.getAsyncIOScheduler()).
+                        subscribe();
+
                 }
 
 
@@ -202,4 +241,16 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
             return cursorEdge;
         }
     }
+
+    private Observable.Transformer<IndexOperationMessage, IndexOperationMessage> applyCollector() {
+
+        return observable -> observable
+            .filter((IndexOperationMessage msg) -> !msg.isEmpty())
+            .collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single))
+            .doOnNext(indexOperation -> {
+                asyncEventService.queueIndexOperationMessage(indexOperation);
+            });
+
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/1b43bda3/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java
index 1d63bc6..3d7df3b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java
@@ -20,6 +20,8 @@
 package org.apache.usergrid.corepersistence.pipeline.read.traverse;
 
 
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
+import org.apache.usergrid.corepersistence.asyncevents.EventBuilder;
 import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
 
@@ -41,8 +43,12 @@ public class ReadGraphCollectionFilter extends AbstractReadGraphFilter {
      * Create a new instance of our command
      */
     @Inject
-    public ReadGraphCollectionFilter( final GraphManagerFactory graphManagerFactory, final RxTaskScheduler rxTaskScheduler, @Assisted final String collectionName ) {
-        super( graphManagerFactory, rxTaskScheduler );
+    public ReadGraphCollectionFilter( final GraphManagerFactory graphManagerFactory,
+                                      final RxTaskScheduler rxTaskScheduler,
+                                      final EventBuilder eventBuilder,
+                                      final AsyncEventService asyncEventService,
+                                      @Assisted final String collectionName ) {
+        super( graphManagerFactory, rxTaskScheduler, eventBuilder, asyncEventService );
         this.collectionName = collectionName;
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/1b43bda3/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java
index efe94db..b2d368b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java
@@ -20,6 +20,8 @@
 package org.apache.usergrid.corepersistence.pipeline.read.traverse;
 
 
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
+import org.apache.usergrid.corepersistence.asyncevents.EventBuilder;
 import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
 
@@ -41,8 +43,12 @@ public class ReadGraphConnectionFilter extends AbstractReadGraphFilter {
      * Create a new instance of our command
      */
     @Inject
-    public ReadGraphConnectionFilter( final GraphManagerFactory graphManagerFactory, final RxTaskScheduler rxTaskScheduler, @Assisted final String connectionName ) {
-        super( graphManagerFactory, rxTaskScheduler );
+    public ReadGraphConnectionFilter( final GraphManagerFactory graphManagerFactory,
+                                      final RxTaskScheduler rxTaskScheduler,
+                                      final EventBuilder eventBuilder,
+                                      final AsyncEventService asyncEventService,
+                                      @Assisted final String connectionName ) {
+        super( graphManagerFactory, rxTaskScheduler, eventBuilder, asyncEventService );
         this.connectionName = connectionName;
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/1b43bda3/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
index e4eb5fc..343cc77 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
@@ -157,26 +157,26 @@ public class NodeDeleteListenerImpl implements NodeDeleteListener {
 
         //get all edges pointing to the target node and buffer then into groups for deletion
         Observable<MarkedEdge> targetEdges =
-                getEdgesTypesToTarget( scope, new SimpleSearchEdgeType( node, null, null ) )
-                        .subscribeOn( Schedulers.io() ).flatMap( edgeType -> Observable.create( new ObservableIterator<MarkedEdge>( "getTargetEdges" ) {
+                getEdgesTypesToTarget(scope, new SimpleSearchEdgeType(node, null, null))
+                        .flatMap(edgeType -> Observable.create(new ObservableIterator<MarkedEdge>("getTargetEdges") {
                             @Override
                             protected Iterator<MarkedEdge> getIterator() {
-                                return storageSerialization.getEdgesToTarget( scope,
-                                        new SimpleSearchByEdgeType( node, edgeType, maxVersion, SearchByEdgeType.Order.DESCENDING,  Optional.<Edge>absent() ) );
+                                return storageSerialization.getEdgesToTarget(scope,
+                                    new SimpleSearchByEdgeType(node, edgeType, maxVersion, SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent()));
                             }
-                        } ) );
+                        }));
 
 
         //get all edges pointing to the source node and buffer them into groups for deletion
         Observable<MarkedEdge> sourceEdges =
-                getEdgesTypesFromSource( scope, new SimpleSearchEdgeType( node, null, null ) )
-                        .subscribeOn( Schedulers.io() ).flatMap( edgeType -> Observable.create( new ObservableIterator<MarkedEdge>( "getSourceEdges" ) {
+                getEdgesTypesFromSource(scope, new SimpleSearchEdgeType(node, null, null))
+                        .flatMap(edgeType -> Observable.create(new ObservableIterator<MarkedEdge>("getSourceEdges") {
                             @Override
                             protected Iterator<MarkedEdge> getIterator() {
-                                return storageSerialization.getEdgesFromSource( scope,
-                                        new SimpleSearchByEdgeType( node, edgeType, maxVersion, SearchByEdgeType.Order.DESCENDING,  Optional.<Edge>absent() ) );
+                                return storageSerialization.getEdgesFromSource(scope,
+                                    new SimpleSearchByEdgeType(node, edgeType, maxVersion, SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent()));
                             }
-                        } ) );
+                        }));
 
         //merge both source and target into 1 observable.  We'll need to check them all regardless of order
         return Observable.merge( targetEdges, sourceEdges )
@@ -235,12 +235,7 @@ public class NodeDeleteListenerImpl implements NodeDeleteListener {
 
                     //run both the source/target edge type cleanup, then proceed
                     return Observable.merge( sourceMetaCleanup, targetMetaCleanup ).lastOrDefault( null )
-                                     .flatMap( new Func1<Integer, Observable<MarkedEdge>>() {
-                                         @Override
-                                         public Observable<MarkedEdge> call( final Integer integer ) {
-                                             return Observable.from( markedEdges );
-                                         }
-                                     } );
+                                     .flatMap(integer -> Observable.from( markedEdges ));
                 } );
     }
 


[4/4] usergrid git commit: Merge branch 'delete-event-updates' into 2.1-release

Posted by to...@apache.org.
Merge branch 'delete-event-updates' into 2.1-release


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/26860545
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/26860545
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/26860545

Branch: refs/heads/2.1-release
Commit: 26860545ece19361569370bc63496a01bb48738d
Parents: e018c1e 76476f1
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Oct 27 14:40:23 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Oct 27 14:40:23 2015 -0600

----------------------------------------------------------------------
 .../asyncevents/AmazonAsyncEventService.java    | 68 ++++++++++--------
 .../asyncevents/AsyncEventService.java          |  5 ++
 .../asyncevents/InMemoryAsyncEventService.java  |  5 ++
 .../index/IndexProcessorFig.java                | 13 +++-
 .../read/traverse/AbstractReadGraphFilter.java  | 72 +++++++++++++++++---
 .../traverse/ReadGraphCollectionFilter.java     | 10 ++-
 .../traverse/ReadGraphConnectionFilter.java     | 10 ++-
 .../impl/stage/NodeDeleteListenerImpl.java      | 27 +++-----
 8 files changed, 150 insertions(+), 60 deletions(-)
----------------------------------------------------------------------



[3/4] usergrid git commit: Updates the message flow to allow for multiple processor threads per SQS take thread

Posted by to...@apache.org.
Updates the message flow to allow for multiple processor threads per SQS take thread


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/76476f17
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/76476f17
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/76476f17

Branch: refs/heads/2.1-release
Commit: 76476f17cf8e8be6f01660db3d21110eda8247f5
Parents: 1b43bda
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Oct 27 14:35:34 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Oct 27 14:35:34 2015 -0600

----------------------------------------------------------------------
 .../asyncevents/AmazonAsyncEventService.java    | 68 +++++++++++---------
 .../index/IndexProcessorFig.java                | 13 +++-
 2 files changed, 51 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/76476f17/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index d93e304..6b9abbc 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -679,35 +679,45 @@ public class AmazonAsyncEventService implements AsyncEventService {
                             }
                             while ( true );
                         }
-                    } )
-                            //this won't block our read loop, just reads and proceeds
-                            .map( messages -> {
-                                if ( messages == null || messages.size() == 0 ) {
-                                    return null;
-                                }
-
-                                try {
-                                    List<IndexEventResult> indexEventResults = callEventHandlers( messages );
-                                    List<QueueMessage> messagesToAck = submitToIndex( indexEventResults );
-                                    if ( messagesToAck == null || messagesToAck.size() == 0 ) {
-                                        logger.error( "No messages came back from the queue operation should have seen "
-                                            + messages.size(), messages );
-                                        return messagesToAck;
-                                    }
-                                    if ( messagesToAck.size() < messages.size() ) {
-                                        logger.error( "Missing messages from queue post operation", messages,
-                                            messagesToAck );
-                                    }
-                                    //ack each message, but only if we didn't error.
-                                    ack( messagesToAck );
-                                    return messagesToAck;
-                                }
-                                catch ( Exception e ) {
-                                    logger.error( "failed to ack messages to sqs", e );
-                                    return null;
-                                    //do not rethrow so we can process all of them
-                                }
-                            } );
+                    } )        //this won't block our read loop, just reads and proceeds
+                        .flatMap( sqsMessages -> {
+
+                            //do this on a different schedule, and introduce concurrency with flatmap for faster processing
+                            return Observable.just( sqsMessages )
+
+                                             .map( messages -> {
+                                                 if ( messages == null || messages.size() == 0 ) {
+                                                     return null;
+                                                 }
+
+                                                 try {
+                                                     List<IndexEventResult> indexEventResults =
+                                                         callEventHandlers( messages );
+                                                     List<QueueMessage> messagesToAck =
+                                                         submitToIndex( indexEventResults );
+                                                     if ( messagesToAck == null || messagesToAck.size() == 0 ) {
+                                                         logger.error(
+                                                             "No messages came back from the queue operation should "
+                                                                 + "have seen "
+                                                                 + messages.size(), messages );
+                                                         return messagesToAck;
+                                                     }
+                                                     if ( messagesToAck.size() < messages.size() ) {
+                                                         logger.error( "Missing messages from queue post operation",
+                                                             messages, messagesToAck );
+                                                     }
+                                                     //ack each message, but only if we didn't error.
+                                                     ack( messagesToAck );
+                                                     return messagesToAck;
+                                                 }
+                                                 catch ( Exception e ) {
+                                                     logger.error( "failed to ack messages to sqs", e );
+                                                     return null;
+                                                     //do not rethrow so we can process all of them
+                                                 }
+                                             } ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() );
+                            //end flatMap
+                        }, indexProcessorFig.getEventConcurrencyFactor() );
 
             //start in the background
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/76476f17/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
index 7650c62..9d02717 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
@@ -36,6 +36,8 @@ public interface IndexProcessorFig extends GuicyFig {
 
     String ELASTICSEARCH_WORKER_COUNT = "elasticsearch.worker_count";
 
+    String EVENT_CONCURRENCY_FACTOR = "event.concurrency.factor";
+
     String ELASTICSEARCH_QUEUE_IMPL = "elasticsearch.queue_impl";
 
     String INDEX_QUEUE_READ_TIMEOUT = "elasticsearch.queue_read_timeout";
@@ -70,9 +72,18 @@ public interface IndexProcessorFig extends GuicyFig {
     int getIndexQueueVisibilityTimeout();
 
     /**
+     * The number of worker threads used when handing off messages from the SQS thread
+     */
+    @Default( "20" )
+    @Key( EVENT_CONCURRENCY_FACTOR )
+    int getEventConcurrencyFactor();
+
+
+
+    /**
      * The number of worker threads used to read index write requests from the queue.
      */
-    @Default( "16" )
+    @Default( "8" )
     @Key( ELASTICSEARCH_WORKER_COUNT )
     int getWorkerCount();