You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/10/23 22:16:41 UTC
[18/18] usergrid git commit: Upgrades RX to latest stable
Upgrades RX to latest stable
Adds deleted markers for the nodes on edges
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/4e51d383
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/4e51d383
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/4e51d383
Branch: refs/heads/USERGRID-1052
Commit: 4e51d3839cecc9d55ea0bc4f65d5610d29b34fb9
Parents: f73ac4c
Author: Todd Nine <tn...@apigee.com>
Authored: Fri Oct 23 14:16:17 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri Oct 23 14:16:17 2015 -0600
----------------------------------------------------------------------
.../read/traverse/AbstractReadGraphFilter.java | 2 +-
.../usergrid/persistence/graph/MarkedEdge.java | 13 +++++
.../graph/impl/GraphManagerImpl.java | 54 ++++++++++----------
.../graph/impl/SimpleMarkedEdge.java | 33 ++++++++++--
stack/corepersistence/pom.xml | 2 +-
5 files changed, 71 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/4e51d383/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
index 88c912a..d3e0345 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
@@ -85,7 +85,7 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
/**
* TODO, pass a message with pointers to our cursor values to be generated later
*/
- return graphManager.loadEdgesFromSource( search ).onBackpressureBlock()
+ return graphManager.loadEdgesFromSource( search )
//set the edge state for cursors
.doOnNext( edge -> {
logger.trace( "Seeking over edge {}", edge );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/4e51d383/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/MarkedEdge.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/MarkedEdge.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/MarkedEdge.java
index 4b5eeaa..da6fedb 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/MarkedEdge.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/MarkedEdge.java
@@ -38,4 +38,17 @@ public interface MarkedEdge extends Edge{
*/
boolean isDeleted();
+ /**
+ * Return true if the source node is deleted
+ * @return
+ */
+ boolean isSourceNodeDelete();
+
+ /**
+ * Return true if the target node is deleted
+ * @return
+ */
+ boolean isTargetNodeDeleted();
+
+
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/4e51d383/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
index c1e9cea..1bcb398 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
@@ -291,7 +291,7 @@ public class GraphManagerImpl implements GraphManager {
return storageEdgeSerialization.getEdgeVersions( scope, searchByEdge );
}
} ).buffer( graphFig.getScanPageSize() )
- .compose( new EdgeBufferFilter( searchByEdge.getMaxTimestamp(), searchByEdge.filterMarked() ) );
+ .compose( new EdgeBufferFilter( searchByEdge.filterMarked() ) );
return ObservableTimer.time( edges, loadEdgesVersionsTimer );
}
@@ -306,7 +306,7 @@ public class GraphManagerImpl implements GraphManager {
return storageEdgeSerialization.getEdgesFromSource( scope, search );
}
} ).buffer( graphFig.getScanPageSize() )
- .compose( new EdgeBufferFilter( search.getMaxTimestamp(), search.filterMarked() ) );
+ .compose( new EdgeBufferFilter( search.filterMarked() ) );
return ObservableTimer.time( edges, loadEdgesFromSourceTimer );
}
@@ -321,7 +321,7 @@ public class GraphManagerImpl implements GraphManager {
return storageEdgeSerialization.getEdgesToTarget( scope, search );
}
} ).buffer( graphFig.getScanPageSize() )
- .compose( new EdgeBufferFilter( search.getMaxTimestamp(), search.filterMarked() ) );
+ .compose( new EdgeBufferFilter( search.filterMarked() ) );
return ObservableTimer.time( edges, loadEdgesToTargetTimer );
@@ -337,7 +337,7 @@ public class GraphManagerImpl implements GraphManager {
return storageEdgeSerialization.getEdgesFromSourceByTargetType( scope, search );
}
} ).buffer( graphFig.getScanPageSize() )
- .compose( new EdgeBufferFilter( search.getMaxTimestamp(), search.filterMarked() ) );
+ .compose( new EdgeBufferFilter( search.filterMarked() ) );
return ObservableTimer.time( edges, loadEdgesFromSourceByTypeTimer );
}
@@ -352,7 +352,7 @@ public class GraphManagerImpl implements GraphManager {
return storageEdgeSerialization.getEdgesToTargetBySourceType( scope, search );
}
} ).buffer( graphFig.getScanPageSize() )
- .compose( new EdgeBufferFilter( search.getMaxTimestamp(), search.filterMarked() ) );
+ .compose( new EdgeBufferFilter( search.filterMarked() ) );
return ObservableTimer.time( edges, loadEdgesToTargetByTypeTimer );
}
@@ -420,12 +420,10 @@ public class GraphManagerImpl implements GraphManager {
Observable.Transformer<List<MarkedEdge>, MarkedEdge> {//implements Func1<List<MarkedEdge>,
// Observable<MarkedEdge>> {
- private final long maxVersion;
private final boolean filterMarked;
- private EdgeBufferFilter( final long maxVersion, final boolean filterMarked ) {
- this.maxVersion = maxVersion;
+ private EdgeBufferFilter( final boolean filterMarked ) {
this.filterMarked = filterMarked;
}
@@ -444,23 +442,16 @@ public class GraphManagerImpl implements GraphManager {
final Observable<MarkedEdge> markedEdgeObservable = Observable.from( markedEdges );
- /**
- * We aren't going to filter anything, return exactly what we're passed
- */
- if(!filterMarked){
- return markedEdgeObservable;
- }
-
//We need to filter, perform that filter
final Map<Id, Long> markedVersions = nodeSerialization.getMaxVersions( scope, markedEdges );
- return markedEdgeObservable.filter( edge -> {
- final long edgeTimestamp = edge.getTimestamp();
+ return markedEdgeObservable.map( edge -> {
- //our edge needs to not be deleted and have a version that's > max Version
- if ( edge.isDeleted() ) {
- return false;
- }
+ /**
+ * Make sure we mark source and target deleted nodes as such
+ */
+
+ final long edgeTimestamp = edge.getTimestamp();
final Long sourceTimestamp = markedVersions.get( edge.getSourceNode() );
@@ -468,22 +459,29 @@ public class GraphManagerImpl implements GraphManager {
//the source Id has been marked for deletion. It's version is <= to the marked version for
// deletion,
// so we need to discard it
- if ( sourceTimestamp != null && Long.compare( edgeTimestamp, sourceTimestamp ) < 1 ) {
- return false;
- }
+ final boolean isSourceDeleted = ( sourceTimestamp != null && Long.compare( edgeTimestamp, sourceTimestamp ) < 1 );
final Long targetTimestamp = markedVersions.get( edge.getTargetNode() );
//the target Id has been marked for deletion. It's version is <= to the marked version for
// deletion,
// so we need to discard it
- if ( targetTimestamp != null && Long.compare( edgeTimestamp, targetTimestamp ) < 1 ) {
- return false;
+ final boolean isTargetDeleted = ( targetTimestamp != null && Long.compare( edgeTimestamp, targetTimestamp ) < 1 );
+
+ //one has been marked for deletion, return it
+ if(isSourceDeleted || isTargetDeleted){
+ return new SimpleMarkedEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(), edge.getTimestamp(), edge.isDeleted(), isSourceDeleted, isTargetDeleted );
}
+ return edge;
+ } ).filter( simpleMarkedEdge -> {
+ if(!filterMarked){
+ return true;
+ }
- return true;
- } );
+ //if any one of these is true, we filter it
+ return !(simpleMarkedEdge.isDeleted() || simpleMarkedEdge.isSourceNodeDelete() || simpleMarkedEdge.isTargetNodeDeleted());
+ });
} );
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/4e51d383/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleMarkedEdge.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleMarkedEdge.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleMarkedEdge.java
index 29d90eb..c6dc2e4 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleMarkedEdge.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleMarkedEdge.java
@@ -34,12 +34,23 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
public class SimpleMarkedEdge extends SimpleEdge implements MarkedEdge {
private final boolean deleted;
+ private final boolean isSourceNodeDeleted;
+ private final boolean isTargetNodeDeleted;
public SimpleMarkedEdge( final Id sourceNode, final String type, final Id targetNode, final long timestamp, final boolean deleted) {
- super(sourceNode, type, targetNode, timestamp);
+ this( sourceNode, type, targetNode, timestamp, deleted, false, false );
+ }
+
+
+ public SimpleMarkedEdge( final Id sourceNode, final String type, final Id targetNode, final long timestamp,
+ final boolean deleted, final boolean isSourceNodeDeleted,
+ final boolean isTargetNodeDeleted ) {
+ super( sourceNode, type, targetNode, timestamp );
this.deleted = deleted;
+ this.isSourceNodeDeleted = isSourceNodeDeleted;
+ this.isTargetNodeDeleted = isTargetNodeDeleted;
}
@@ -56,6 +67,18 @@ public class SimpleMarkedEdge extends SimpleEdge implements MarkedEdge {
@Override
+ public boolean isSourceNodeDelete() {
+ return isSourceNodeDeleted;
+ }
+
+
+ @Override
+ public boolean isTargetNodeDeleted() {
+ return isTargetNodeDeleted;
+ }
+
+
+ @Override
public boolean equals( final Object o ) {
if ( this == o ) {
return true;
@@ -81,6 +104,8 @@ public class SimpleMarkedEdge extends SimpleEdge implements MarkedEdge {
public int hashCode() {
int result = super.hashCode();
result = 31 * result + ( deleted ? 1 : 0 );
+ result = 31 * result + ( isSourceNodeDeleted ? 1 : 0 );
+ result = 31 * result + ( isTargetNodeDeleted ? 1 : 0 );
return result;
}
@@ -88,8 +113,10 @@ public class SimpleMarkedEdge extends SimpleEdge implements MarkedEdge {
@Override
public String toString() {
return "SimpleMarkedEdge{" +
- "deleted=" + deleted +
- "} " + super.toString();
+ "deleted=" + deleted +
+ ", isSourceNodeDeleted=" + isSourceNodeDeleted +
+ ", isTargetNodeDeleted=" + isTargetNodeDeleted +
+ "} " + super.toString();
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/4e51d383/stack/corepersistence/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/pom.xml b/stack/corepersistence/pom.xml
index 4b47bc0..4e4648e 100644
--- a/stack/corepersistence/pom.xml
+++ b/stack/corepersistence/pom.xml
@@ -70,7 +70,7 @@ limitations under the License.
<junit.version>4.11</junit.version>
<kryo-serializers.version>0.26</kryo-serializers.version>
<log4j.version>1.2.17</log4j.version>
- <rx.version>1.0.12</rx.version>
+ <rx.version>1.0.14</rx.version>
<slf4j.version>1.7.2</slf4j.version>
<surefire.version>2.16</surefire.version>
<aws.version>1.10.6</aws.version>