You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2015/10/26 17:58:13 UTC

[3/7] usergrid git commit: Upgrades RX to latest stable

Upgrades RX to latest stable

Adds deleted markers for the nodes on edges


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/4e51d383
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/4e51d383
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/4e51d383

Branch: refs/heads/2.1-release
Commit: 4e51d3839cecc9d55ea0bc4f65d5610d29b34fb9
Parents: f73ac4c
Author: Todd Nine <tn...@apigee.com>
Authored: Fri Oct 23 14:16:17 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri Oct 23 14:16:17 2015 -0600

----------------------------------------------------------------------
 .../read/traverse/AbstractReadGraphFilter.java  |  2 +-
 .../usergrid/persistence/graph/MarkedEdge.java  | 13 +++++
 .../graph/impl/GraphManagerImpl.java            | 54 ++++++++++----------
 .../graph/impl/SimpleMarkedEdge.java            | 33 ++++++++++--
 stack/corepersistence/pom.xml                   |  2 +-
 5 files changed, 71 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/4e51d383/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
index 88c912a..d3e0345 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
@@ -85,7 +85,7 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
             /**
              * TODO, pass a message with pointers to our cursor values to be generated later
              */
-            return graphManager.loadEdgesFromSource( search ).onBackpressureBlock()
+            return graphManager.loadEdgesFromSource( search )
                 //set the edge state for cursors
                 .doOnNext( edge -> {
                     logger.trace( "Seeking over edge {}", edge );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/4e51d383/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/MarkedEdge.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/MarkedEdge.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/MarkedEdge.java
index 4b5eeaa..da6fedb 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/MarkedEdge.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/MarkedEdge.java
@@ -38,4 +38,17 @@ public interface MarkedEdge extends Edge{
      */
     boolean isDeleted();
 
+    /**
+     * Return true if the source node is deleted
+     * @return
+     */
+    boolean isSourceNodeDelete();
+
+    /**
+     * Return true if the target node is deleted
+     * @return
+     */
+    boolean isTargetNodeDeleted();
+
+
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/4e51d383/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
index c1e9cea..1bcb398 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
@@ -291,7 +291,7 @@ public class GraphManagerImpl implements GraphManager {
                     return storageEdgeSerialization.getEdgeVersions( scope, searchByEdge );
                 }
             } ).buffer( graphFig.getScanPageSize() )
-                      .compose( new EdgeBufferFilter( searchByEdge.getMaxTimestamp(), searchByEdge.filterMarked() ) );
+                      .compose( new EdgeBufferFilter( searchByEdge.filterMarked() ) );
 
         return ObservableTimer.time( edges, loadEdgesVersionsTimer );
     }
@@ -306,7 +306,7 @@ public class GraphManagerImpl implements GraphManager {
                     return storageEdgeSerialization.getEdgesFromSource( scope, search );
                 }
             } ).buffer( graphFig.getScanPageSize() )
-                      .compose( new EdgeBufferFilter( search.getMaxTimestamp(), search.filterMarked() ) );
+                      .compose( new EdgeBufferFilter( search.filterMarked() ) );
 
         return ObservableTimer.time( edges, loadEdgesFromSourceTimer );
     }
@@ -321,7 +321,7 @@ public class GraphManagerImpl implements GraphManager {
                     return storageEdgeSerialization.getEdgesToTarget( scope, search );
                 }
             } ).buffer( graphFig.getScanPageSize() )
-                      .compose( new EdgeBufferFilter( search.getMaxTimestamp(), search.filterMarked() ) );
+                      .compose( new EdgeBufferFilter( search.filterMarked() ) );
 
 
         return ObservableTimer.time( edges, loadEdgesToTargetTimer );
@@ -337,7 +337,7 @@ public class GraphManagerImpl implements GraphManager {
                     return storageEdgeSerialization.getEdgesFromSourceByTargetType( scope, search );
                 }
             } ).buffer( graphFig.getScanPageSize() )
-                      .compose( new EdgeBufferFilter( search.getMaxTimestamp(), search.filterMarked() ) );
+                      .compose( new EdgeBufferFilter( search.filterMarked() ) );
 
         return ObservableTimer.time( edges, loadEdgesFromSourceByTypeTimer );
     }
@@ -352,7 +352,7 @@ public class GraphManagerImpl implements GraphManager {
                     return storageEdgeSerialization.getEdgesToTargetBySourceType( scope, search );
                 }
             } ).buffer( graphFig.getScanPageSize() )
-                      .compose( new EdgeBufferFilter( search.getMaxTimestamp(), search.filterMarked() ) );
+                      .compose( new EdgeBufferFilter(  search.filterMarked() ) );
 
         return ObservableTimer.time( edges, loadEdgesToTargetByTypeTimer );
     }
@@ -420,12 +420,10 @@ public class GraphManagerImpl implements GraphManager {
         Observable.Transformer<List<MarkedEdge>, MarkedEdge> {//implements Func1<List<MarkedEdge>,
         // Observable<MarkedEdge>> {
 
-        private final long maxVersion;
         private final boolean filterMarked;
 
 
-        private EdgeBufferFilter( final long maxVersion, final boolean filterMarked ) {
-            this.maxVersion = maxVersion;
+        private EdgeBufferFilter( final boolean filterMarked ) {
             this.filterMarked = filterMarked;
         }
 
@@ -444,23 +442,16 @@ public class GraphManagerImpl implements GraphManager {
 
                 final Observable<MarkedEdge> markedEdgeObservable = Observable.from( markedEdges );
 
-                /**
-                 * We aren't going to filter anything, return exactly what we're passed
-                 */
-                if(!filterMarked){
-                    return markedEdgeObservable;
-                }
-
                 //We need to filter, perform that filter
                 final Map<Id, Long> markedVersions = nodeSerialization.getMaxVersions( scope, markedEdges );
 
-                return markedEdgeObservable.filter( edge -> {
-                    final long edgeTimestamp = edge.getTimestamp();
+                return markedEdgeObservable.map( edge -> {
 
-                    //our edge needs to not be deleted and have a version that's > max Version
-                    if ( edge.isDeleted() ) {
-                        return false;
-                    }
+                    /**
+                     * Make sure we mark source and target deleted nodes as such
+                     */
+
+                    final long edgeTimestamp = edge.getTimestamp();
 
 
                     final Long sourceTimestamp = markedVersions.get( edge.getSourceNode() );
@@ -468,22 +459,29 @@ public class GraphManagerImpl implements GraphManager {
                     //the source Id has been marked for deletion.  It's version is <= to the marked version for
                     // deletion,
                     // so we need to discard it
-                    if ( sourceTimestamp != null && Long.compare( edgeTimestamp, sourceTimestamp ) < 1 ) {
-                        return false;
-                    }
+                    final boolean isSourceDeleted =  ( sourceTimestamp != null && Long.compare( edgeTimestamp, sourceTimestamp ) < 1 );
 
                     final Long targetTimestamp = markedVersions.get( edge.getTargetNode() );
 
                     //the target Id has been marked for deletion.  It's version is <= to the marked version for
                     // deletion,
                     // so we need to discard it
-                    if ( targetTimestamp != null && Long.compare( edgeTimestamp, targetTimestamp ) < 1 ) {
-                        return false;
+                    final  boolean isTargetDeleted = ( targetTimestamp != null && Long.compare( edgeTimestamp, targetTimestamp ) < 1 );
+
+                    //one has been marked for deletion, return it
+                    if(isSourceDeleted || isTargetDeleted){
+                        return new SimpleMarkedEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(), edge.getTimestamp(), edge.isDeleted(), isSourceDeleted, isTargetDeleted );
                     }
 
+                    return edge;
+                } ).filter( simpleMarkedEdge -> {
+                    if(!filterMarked){
+                        return true;
+                    }
 
-                    return true;
-                } );
+                    //if any one of these is true, we filter it
+                    return !(simpleMarkedEdge.isDeleted() || simpleMarkedEdge.isSourceNodeDelete() || simpleMarkedEdge.isTargetNodeDeleted());
+                });
             } );
         }
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/4e51d383/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleMarkedEdge.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleMarkedEdge.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleMarkedEdge.java
index 29d90eb..c6dc2e4 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleMarkedEdge.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleMarkedEdge.java
@@ -34,12 +34,23 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
 public class SimpleMarkedEdge extends  SimpleEdge implements MarkedEdge {
 
     private final boolean deleted;
+    private final boolean isSourceNodeDeleted;
+    private final boolean isTargetNodeDeleted;
 
 
     public SimpleMarkedEdge( final Id sourceNode, final String type, final Id targetNode, final long timestamp, final boolean deleted) {
 
-        super(sourceNode, type, targetNode, timestamp);
+        this( sourceNode, type, targetNode, timestamp, deleted, false, false );
+    }
+
+
+    public SimpleMarkedEdge( final Id sourceNode, final String type, final Id targetNode, final long timestamp,
+                             final boolean deleted, final boolean isSourceNodeDeleted,
+                             final boolean isTargetNodeDeleted ) {
+        super( sourceNode, type, targetNode, timestamp );
         this.deleted = deleted;
+        this.isSourceNodeDeleted = isSourceNodeDeleted;
+        this.isTargetNodeDeleted = isTargetNodeDeleted;
     }
 
 
@@ -56,6 +67,18 @@ public class SimpleMarkedEdge extends  SimpleEdge implements MarkedEdge {
 
 
     @Override
+    public boolean isSourceNodeDelete() {
+        return isSourceNodeDeleted;
+    }
+
+
+    @Override
+    public boolean isTargetNodeDeleted() {
+        return isTargetNodeDeleted;
+    }
+
+
+    @Override
     public boolean equals( final Object o ) {
         if ( this == o ) {
             return true;
@@ -81,6 +104,8 @@ public class SimpleMarkedEdge extends  SimpleEdge implements MarkedEdge {
     public int hashCode() {
         int result = super.hashCode();
         result = 31 * result + ( deleted ? 1 : 0 );
+        result = 31 * result + ( isSourceNodeDeleted ? 1 : 0 );
+        result = 31 * result + ( isTargetNodeDeleted ? 1 : 0 );
         return result;
     }
 
@@ -88,8 +113,10 @@ public class SimpleMarkedEdge extends  SimpleEdge implements MarkedEdge {
     @Override
     public String toString() {
         return "SimpleMarkedEdge{" +
-                "deleted=" + deleted +
-                "} " + super.toString();
+            "deleted=" + deleted +
+            ", isSourceNodeDeleted=" + isSourceNodeDeleted +
+            ", isTargetNodeDeleted=" + isTargetNodeDeleted +
+            "} " + super.toString();
     }
 }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/4e51d383/stack/corepersistence/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/pom.xml b/stack/corepersistence/pom.xml
index 4b47bc0..4e4648e 100644
--- a/stack/corepersistence/pom.xml
+++ b/stack/corepersistence/pom.xml
@@ -70,7 +70,7 @@ limitations under the License.
         <junit.version>4.11</junit.version>
         <kryo-serializers.version>0.26</kryo-serializers.version>
         <log4j.version>1.2.17</log4j.version>
-        <rx.version>1.0.12</rx.version>
+        <rx.version>1.0.14</rx.version>
         <slf4j.version>1.7.2</slf4j.version>
         <surefire.version>2.16</surefire.version>
         <aws.version>1.10.6</aws.version>