You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/10/31 00:32:44 UTC

[10/50] [abbrv] usergrid git commit: Added back pressure block as a test

Added back pressure block as a test


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/f73ac4c7
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/f73ac4c7
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/f73ac4c7

Branch: refs/heads/master
Commit: f73ac4c724088150bc1e0315942e93306321a72a
Parents: d35fea5
Author: Todd Nine <tn...@apigee.com>
Authored: Fri Oct 23 10:45:10 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri Oct 23 10:50:27 2015 -0600

----------------------------------------------------------------------
 .../corepersistence/asyncevents/EventBuilderImpl.java        | 3 ++-
 .../usergrid/corepersistence/pipeline/read/FilterResult.java | 7 +++++++
 .../pipeline/read/traverse/AbstractReadGraphFilter.java      | 2 +-
 .../usergrid/persistence/graph/impl/GraphManagerImpl.java    | 8 ++++----
 4 files changed, 14 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/f73ac4c7/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
index cc0356b..18f080b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
@@ -155,7 +155,8 @@ public class EventBuilderImpl implements EventBuilder {
         //observable of entries as the batches are deleted
         final Observable<List<MvccLogEntry>> entries =
             ecm.getVersions( entityId ).buffer( serializationFig.getBufferSize() )
-               .doOnNext( buffer -> ecm.delete( buffer ) ).doOnCompleted( () -> gm.compactNode( entityId ) );
+               .doOnNext( buffer -> ecm.delete( buffer ) ).doOnCompleted( () -> gm.compactNode( entityId ).toBlocking().lastOrDefault(null) );
+
 
 
         return new EntityDeleteResults( edgeObservable, entries );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f73ac4c7/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterResult.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterResult.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterResult.java
index 3c41a2b..915af03 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterResult.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterResult.java
@@ -53,4 +53,11 @@ public class FilterResult<T> {
     }
 
 
+    @Override
+    public String toString() {
+        return "FilterResult{" +
+            "path=" + path +
+            ", value=" + value +
+            '}';
+    }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f73ac4c7/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
index d3e0345..88c912a 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
@@ -85,7 +85,7 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
             /**
              * TODO, pass a message with pointers to our cursor values to be generated later
              */
-            return graphManager.loadEdgesFromSource( search )
+            return graphManager.loadEdgesFromSource( search ).onBackpressureBlock()
                 //set the edge state for cursors
                 .doOnNext( edge -> {
                     logger.trace( "Seeking over edge {}", edge );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f73ac4c7/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
index e119c59..c1e9cea 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
@@ -300,7 +300,7 @@ public class GraphManagerImpl implements GraphManager {
     @Override
     public Observable<Edge> loadEdgesFromSource( final SearchByEdgeType search ) {
         final Observable<Edge> edges =
-            Observable.create( new ObservableIterator<MarkedEdge>( "getEdgeTypesFromSource" ) {
+            Observable.create( new ObservableIterator<MarkedEdge>( "loadEdgesFromSource" ) {
                 @Override
                 protected Iterator<MarkedEdge> getIterator() {
                     return storageEdgeSerialization.getEdgesFromSource( scope, search );
@@ -315,7 +315,7 @@ public class GraphManagerImpl implements GraphManager {
     @Override
     public Observable<Edge> loadEdgesToTarget( final SearchByEdgeType search ) {
         final Observable<Edge> edges =
-            Observable.create( new ObservableIterator<MarkedEdge>( "getEdgeTypesFromSource" ) {
+            Observable.create( new ObservableIterator<MarkedEdge>( "loadEdgesToTarget" ) {
                 @Override
                 protected Iterator<MarkedEdge> getIterator() {
                     return storageEdgeSerialization.getEdgesToTarget( scope, search );
@@ -331,7 +331,7 @@ public class GraphManagerImpl implements GraphManager {
     @Override
     public Observable<Edge> loadEdgesFromSourceByType( final SearchByIdType search ) {
         final Observable<Edge> edges =
-            Observable.create( new ObservableIterator<MarkedEdge>( "getEdgeTypesFromSource" ) {
+            Observable.create( new ObservableIterator<MarkedEdge>( "loadEdgesFromSourceByType" ) {
                 @Override
                 protected Iterator<MarkedEdge> getIterator() {
                     return storageEdgeSerialization.getEdgesFromSourceByTargetType( scope, search );
@@ -346,7 +346,7 @@ public class GraphManagerImpl implements GraphManager {
     @Override
     public Observable<Edge> loadEdgesToTargetByType( final SearchByIdType search ) {
         final Observable<Edge> edges =
-            Observable.create( new ObservableIterator<MarkedEdge>( "getEdgeTypesFromSource" ) {
+            Observable.create( new ObservableIterator<MarkedEdge>( "loadEdgesToTargetByType" ) {
                 @Override
                 protected Iterator<MarkedEdge> getIterator() {
                     return storageEdgeSerialization.getEdgesToTargetBySourceType( scope, search );