You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/10/31 00:32:44 UTC
[10/50] [abbrv] usergrid git commit: Added back pressure block as a
test
Added back pressure block as a test
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/f73ac4c7
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/f73ac4c7
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/f73ac4c7
Branch: refs/heads/master
Commit: f73ac4c724088150bc1e0315942e93306321a72a
Parents: d35fea5
Author: Todd Nine <tn...@apigee.com>
Authored: Fri Oct 23 10:45:10 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri Oct 23 10:50:27 2015 -0600
----------------------------------------------------------------------
.../corepersistence/asyncevents/EventBuilderImpl.java | 3 ++-
.../usergrid/corepersistence/pipeline/read/FilterResult.java | 7 +++++++
.../pipeline/read/traverse/AbstractReadGraphFilter.java | 2 +-
.../usergrid/persistence/graph/impl/GraphManagerImpl.java | 8 ++++----
4 files changed, 14 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/f73ac4c7/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
index cc0356b..18f080b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
@@ -155,7 +155,8 @@ public class EventBuilderImpl implements EventBuilder {
//observable of entries as the batches are deleted
final Observable<List<MvccLogEntry>> entries =
ecm.getVersions( entityId ).buffer( serializationFig.getBufferSize() )
- .doOnNext( buffer -> ecm.delete( buffer ) ).doOnCompleted( () -> gm.compactNode( entityId ) );
+ .doOnNext( buffer -> ecm.delete( buffer ) ).doOnCompleted( () -> gm.compactNode( entityId ).toBlocking().lastOrDefault(null) );
+
return new EntityDeleteResults( edgeObservable, entries );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/f73ac4c7/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterResult.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterResult.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterResult.java
index 3c41a2b..915af03 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterResult.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterResult.java
@@ -53,4 +53,11 @@ public class FilterResult<T> {
}
+ @Override
+ public String toString() {
+ return "FilterResult{" +
+ "path=" + path +
+ ", value=" + value +
+ '}';
+ }
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/f73ac4c7/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
index d3e0345..88c912a 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
@@ -85,7 +85,7 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
/**
* TODO, pass a message with pointers to our cursor values to be generated later
*/
- return graphManager.loadEdgesFromSource( search )
+ return graphManager.loadEdgesFromSource( search ).onBackpressureBlock()
//set the edge state for cursors
.doOnNext( edge -> {
logger.trace( "Seeking over edge {}", edge );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/f73ac4c7/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
index e119c59..c1e9cea 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
@@ -300,7 +300,7 @@ public class GraphManagerImpl implements GraphManager {
@Override
public Observable<Edge> loadEdgesFromSource( final SearchByEdgeType search ) {
final Observable<Edge> edges =
- Observable.create( new ObservableIterator<MarkedEdge>( "getEdgeTypesFromSource" ) {
+ Observable.create( new ObservableIterator<MarkedEdge>( "loadEdgesFromSource" ) {
@Override
protected Iterator<MarkedEdge> getIterator() {
return storageEdgeSerialization.getEdgesFromSource( scope, search );
@@ -315,7 +315,7 @@ public class GraphManagerImpl implements GraphManager {
@Override
public Observable<Edge> loadEdgesToTarget( final SearchByEdgeType search ) {
final Observable<Edge> edges =
- Observable.create( new ObservableIterator<MarkedEdge>( "getEdgeTypesFromSource" ) {
+ Observable.create( new ObservableIterator<MarkedEdge>( "loadEdgesToTarget" ) {
@Override
protected Iterator<MarkedEdge> getIterator() {
return storageEdgeSerialization.getEdgesToTarget( scope, search );
@@ -331,7 +331,7 @@ public class GraphManagerImpl implements GraphManager {
@Override
public Observable<Edge> loadEdgesFromSourceByType( final SearchByIdType search ) {
final Observable<Edge> edges =
- Observable.create( new ObservableIterator<MarkedEdge>( "getEdgeTypesFromSource" ) {
+ Observable.create( new ObservableIterator<MarkedEdge>( "loadEdgesFromSourceByType" ) {
@Override
protected Iterator<MarkedEdge> getIterator() {
return storageEdgeSerialization.getEdgesFromSourceByTargetType( scope, search );
@@ -346,7 +346,7 @@ public class GraphManagerImpl implements GraphManager {
@Override
public Observable<Edge> loadEdgesToTargetByType( final SearchByIdType search ) {
final Observable<Edge> edges =
- Observable.create( new ObservableIterator<MarkedEdge>( "getEdgeTypesFromSource" ) {
+ Observable.create( new ObservableIterator<MarkedEdge>( "loadEdgesToTargetByType" ) {
@Override
protected Iterator<MarkedEdge> getIterator() {
return storageEdgeSerialization.getEdgesToTargetBySourceType( scope, search );