You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2015/10/26 17:58:11 UTC

[1/7] usergrid git commit: Added back pressure block as a test

Repository: usergrid
Updated Branches:
  refs/heads/2.1-release d35fea563 -> b025dda98


Added back pressure block as a test


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/f73ac4c7
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/f73ac4c7
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/f73ac4c7

Branch: refs/heads/2.1-release
Commit: f73ac4c724088150bc1e0315942e93306321a72a
Parents: d35fea5
Author: Todd Nine <tn...@apigee.com>
Authored: Fri Oct 23 10:45:10 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri Oct 23 10:50:27 2015 -0600

----------------------------------------------------------------------
 .../corepersistence/asyncevents/EventBuilderImpl.java        | 3 ++-
 .../usergrid/corepersistence/pipeline/read/FilterResult.java | 7 +++++++
 .../pipeline/read/traverse/AbstractReadGraphFilter.java      | 2 +-
 .../usergrid/persistence/graph/impl/GraphManagerImpl.java    | 8 ++++----
 4 files changed, 14 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/f73ac4c7/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
index cc0356b..18f080b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
@@ -155,7 +155,8 @@ public class EventBuilderImpl implements EventBuilder {
         //observable of entries as the batches are deleted
         final Observable<List<MvccLogEntry>> entries =
             ecm.getVersions( entityId ).buffer( serializationFig.getBufferSize() )
-               .doOnNext( buffer -> ecm.delete( buffer ) ).doOnCompleted( () -> gm.compactNode( entityId ) );
+               .doOnNext( buffer -> ecm.delete( buffer ) ).doOnCompleted( () -> gm.compactNode( entityId ).toBlocking().lastOrDefault(null) );
+
 
 
         return new EntityDeleteResults( edgeObservable, entries );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f73ac4c7/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterResult.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterResult.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterResult.java
index 3c41a2b..915af03 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterResult.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterResult.java
@@ -53,4 +53,11 @@ public class FilterResult<T> {
     }
 
 
+    @Override
+    public String toString() {
+        return "FilterResult{" +
+            "path=" + path +
+            ", value=" + value +
+            '}';
+    }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f73ac4c7/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
index d3e0345..88c912a 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
@@ -85,7 +85,7 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
             /**
              * TODO, pass a message with pointers to our cursor values to be generated later
              */
-            return graphManager.loadEdgesFromSource( search )
+            return graphManager.loadEdgesFromSource( search ).onBackpressureBlock()
                 //set the edge state for cursors
                 .doOnNext( edge -> {
                     logger.trace( "Seeking over edge {}", edge );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f73ac4c7/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
index e119c59..c1e9cea 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
@@ -300,7 +300,7 @@ public class GraphManagerImpl implements GraphManager {
     @Override
     public Observable<Edge> loadEdgesFromSource( final SearchByEdgeType search ) {
         final Observable<Edge> edges =
-            Observable.create( new ObservableIterator<MarkedEdge>( "getEdgeTypesFromSource" ) {
+            Observable.create( new ObservableIterator<MarkedEdge>( "loadEdgesFromSource" ) {
                 @Override
                 protected Iterator<MarkedEdge> getIterator() {
                     return storageEdgeSerialization.getEdgesFromSource( scope, search );
@@ -315,7 +315,7 @@ public class GraphManagerImpl implements GraphManager {
     @Override
     public Observable<Edge> loadEdgesToTarget( final SearchByEdgeType search ) {
         final Observable<Edge> edges =
-            Observable.create( new ObservableIterator<MarkedEdge>( "getEdgeTypesFromSource" ) {
+            Observable.create( new ObservableIterator<MarkedEdge>( "loadEdgesToTarget" ) {
                 @Override
                 protected Iterator<MarkedEdge> getIterator() {
                     return storageEdgeSerialization.getEdgesToTarget( scope, search );
@@ -331,7 +331,7 @@ public class GraphManagerImpl implements GraphManager {
     @Override
     public Observable<Edge> loadEdgesFromSourceByType( final SearchByIdType search ) {
         final Observable<Edge> edges =
-            Observable.create( new ObservableIterator<MarkedEdge>( "getEdgeTypesFromSource" ) {
+            Observable.create( new ObservableIterator<MarkedEdge>( "loadEdgesFromSourceByType" ) {
                 @Override
                 protected Iterator<MarkedEdge> getIterator() {
                     return storageEdgeSerialization.getEdgesFromSourceByTargetType( scope, search );
@@ -346,7 +346,7 @@ public class GraphManagerImpl implements GraphManager {
     @Override
     public Observable<Edge> loadEdgesToTargetByType( final SearchByIdType search ) {
         final Observable<Edge> edges =
-            Observable.create( new ObservableIterator<MarkedEdge>( "getEdgeTypesFromSource" ) {
+            Observable.create( new ObservableIterator<MarkedEdge>( "loadEdgesToTargetByType" ) {
                 @Override
                 protected Iterator<MarkedEdge> getIterator() {
                     return storageEdgeSerialization.getEdgesToTargetBySourceType( scope, search );


[6/7] usergrid git commit: Cleaned up unused import

Posted by mr...@apache.org.
Cleaned up unused import


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/6e34b20b
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/6e34b20b
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/6e34b20b

Branch: refs/heads/2.1-release
Commit: 6e34b20b7e1e0d125c7a3d787334d8c1293487a3
Parents: a770a6d
Author: Todd Nine <tn...@apigee.com>
Authored: Fri Oct 23 16:50:43 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri Oct 23 16:50:43 2015 -0600

----------------------------------------------------------------------
 .../main/java/org/apache/usergrid/persistence/graph/MarkedEdge.java | 1 -
 1 file changed, 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/6e34b20b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/MarkedEdge.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/MarkedEdge.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/MarkedEdge.java
index 88809e2..54d1596 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/MarkedEdge.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/MarkedEdge.java
@@ -20,7 +20,6 @@
 package org.apache.usergrid.persistence.graph;
 
 
-import org.apache.usergrid.persistence.graph.impl.SimpleEdge;
 import org.apache.usergrid.persistence.graph.impl.SimpleMarkedEdge;
 
 import com.fasterxml.jackson.databind.annotation.JsonDeserialize;


[7/7] usergrid git commit: Since delete event is also being used for read repair, there are possibilities where the ES documents are already removed, but entities and edges need cleaned from c*. Don't validate empty sets returned from processing the eve

Posted by mr...@apache.org.
Since delete event is also being used for read repair, there are possibilities where the ES documents are already removed, but entities and edges need cleaned from c*.  Don't validate empty sets returned from processing the event as it's a possible happy path case.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/b025dda9
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/b025dda9
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/b025dda9

Branch: refs/heads/2.1-release
Commit: b025dda98f97b964df9b63a5622aa3a735d2813a
Parents: 6e34b20
Author: Michael Russo <mi...@gmail.com>
Authored: Sat Oct 24 20:15:21 2015 -0700
Committer: Michael Russo <mi...@gmail.com>
Committed: Sat Oct 24 20:15:21 2015 -0700

----------------------------------------------------------------------
 .../corepersistence/asyncevents/AmazonAsyncEventService.java       | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/b025dda9/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index fe53776..d93e304 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -324,6 +324,8 @@ public class AmazonAsyncEventService implements AsyncEventService {
                 }
                 else if ( event instanceof EntityDeleteEvent ) {
                     indexoperationObservable = handleEntityDelete( message );
+                    validateEmptySets = false; // do not check this one for an empty set b/c it can be empty
+
                 }
                 else if ( event instanceof EntityIndexEvent ) {
                     indexoperationObservable = handleEntityIndexUpdate( message );


[2/7] usergrid git commit: De-couple entity deletes from edge removal from graph.

Posted by mr...@apache.org.
De-couple entity deletes from edge removal from graph.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/032ffd48
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/032ffd48
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/032ffd48

Branch: refs/heads/2.1-release
Commit: 032ffd48127e600d026446535cfb776f92aac000
Parents: d35fea5
Author: Michael Russo <mi...@gmail.com>
Authored: Fri Oct 23 12:59:57 2015 -0700
Committer: Michael Russo <mi...@gmail.com>
Committed: Fri Oct 23 12:59:57 2015 -0700

----------------------------------------------------------------------
 .../asyncevents/AmazonAsyncEventService.java    |  7 +++---
 .../asyncevents/EventBuilder.java               | 25 +++++++++++++++-----
 .../asyncevents/EventBuilderImpl.java           | 10 ++++----
 .../asyncevents/InMemoryAsyncEventService.java  |  5 ++--
 4 files changed, 31 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/032ffd48/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index 7034a67..fe53776 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -586,9 +586,10 @@ public class AmazonAsyncEventService implements AsyncEventService {
             entityDeleteResults = eventBuilder.buildEntityDelete( applicationScope, entityId );
 
 
-        entityDeleteResults
-            .getEntitiesCompacted()
-            .collect(() -> new ArrayList<>(), (list, item) -> list.add(item)).toBlocking().lastOrDefault(null);
+        // Delete the entities and remove from graph separately
+        entityDeleteResults.getEntitiesDeleted().toBlocking().lastOrDefault(null);
+
+        entityDeleteResults.getCompactedNode().toBlocking().lastOrDefault(null);
 
         return entityDeleteResults.getIndexObservable();
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/032ffd48/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
index d246e2f..480756f 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
@@ -80,17 +80,25 @@ public interface EventBuilder {
 
     /**
      * A bean to hold both our observables so the caller can choose the subscription mechanism.  Note that
-     * indexOperationMessages should be subscribed and completed BEFORE the getEntitiesCompacted is subscribed
+     * indexOperationMessages should be subscribed and completed BEFORE the getEntitiesDeleted is subscribed
      */
     final class EntityDeleteResults {
         private final Observable<IndexOperationMessage> indexOperationMessageObservable;
-        private final Observable<List<MvccLogEntry>> entitiesCompacted;
+        private final Observable<List<MvccLogEntry>> entitiesDeleted;
+
+
+
+        private final Observable<Id> compactedNode;
+
+
 
 
         public EntityDeleteResults( final Observable<IndexOperationMessage> indexOperationMessageObservable,
-                                    final Observable<List<MvccLogEntry>> entitiesCompacted ) {
+                                    final Observable<List<MvccLogEntry>> entitiesDeleted,
+                                    final Observable<Id> compactedNode) {
             this.indexOperationMessageObservable = indexOperationMessageObservable;
-            this.entitiesCompacted = entitiesCompacted;
+            this.entitiesDeleted = entitiesDeleted;
+            this.compactedNode = compactedNode;
         }
 
 
@@ -98,9 +106,14 @@ public interface EventBuilder {
             return indexOperationMessageObservable;
         }
 
+        public Observable<List<MvccLogEntry>> getEntitiesDeleted() {
+            return entitiesDeleted;
+        }
 
-        public Observable<List<MvccLogEntry>> getEntitiesCompacted() {
-            return entitiesCompacted;
+        public Observable<Id> getCompactedNode() {
+            return compactedNode;
         }
+
+
     }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/032ffd48/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
index cc0356b..d819f39 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
@@ -128,8 +128,6 @@ public class EventBuilderImpl implements EventBuilder {
 
         final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );
 
-        //needs get versions here.
-
 
         //TODO: change this to be an observable
         //so we get these versions and loop through them until we find the MvccLogEntry that is marked as delete.
@@ -155,10 +153,14 @@ public class EventBuilderImpl implements EventBuilder {
         //observable of entries as the batches are deleted
         final Observable<List<MvccLogEntry>> entries =
             ecm.getVersions( entityId ).buffer( serializationFig.getBufferSize() )
-               .doOnNext( buffer -> ecm.delete( buffer ) ).doOnCompleted( () -> gm.compactNode( entityId ) );
+               .doOnNext( buffer -> ecm.delete( buffer ) );
+
+
+        // observable of the edge delete from graph
+        final Observable<Id> compactedNode = gm.compactNode(entityId);
 
 
-        return new EntityDeleteResults( edgeObservable, entries );
+        return new EntityDeleteResults( edgeObservable, entries, compactedNode );
     }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/032ffd48/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
index d5a0398..fc6385c 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
@@ -22,8 +22,6 @@ package org.apache.usergrid.corepersistence.asyncevents;
 
 import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
 import org.apache.usergrid.persistence.index.impl.IndexProducer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.corepersistence.index.EntityIndexOperation;
 import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
@@ -103,7 +101,8 @@ public class InMemoryAsyncEventService implements AsyncEventService {
             eventBuilder.buildEntityDelete( applicationScope, entityId );
 
         run( results.getIndexObservable() );
-        run( results.getEntitiesCompacted() );
+        run( results.getEntitiesDeleted() );
+        run( results.getCompactedNode() );
     }
 
 


[3/7] usergrid git commit: Upgrades RX to latest stable

Posted by mr...@apache.org.
Upgrades RX to latest stable

Adds deleted markers for the nodes on edges


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/4e51d383
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/4e51d383
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/4e51d383

Branch: refs/heads/2.1-release
Commit: 4e51d3839cecc9d55ea0bc4f65d5610d29b34fb9
Parents: f73ac4c
Author: Todd Nine <tn...@apigee.com>
Authored: Fri Oct 23 14:16:17 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri Oct 23 14:16:17 2015 -0600

----------------------------------------------------------------------
 .../read/traverse/AbstractReadGraphFilter.java  |  2 +-
 .../usergrid/persistence/graph/MarkedEdge.java  | 13 +++++
 .../graph/impl/GraphManagerImpl.java            | 54 ++++++++++----------
 .../graph/impl/SimpleMarkedEdge.java            | 33 ++++++++++--
 stack/corepersistence/pom.xml                   |  2 +-
 5 files changed, 71 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/4e51d383/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
index 88c912a..d3e0345 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
@@ -85,7 +85,7 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
             /**
              * TODO, pass a message with pointers to our cursor values to be generated later
              */
-            return graphManager.loadEdgesFromSource( search ).onBackpressureBlock()
+            return graphManager.loadEdgesFromSource( search )
                 //set the edge state for cursors
                 .doOnNext( edge -> {
                     logger.trace( "Seeking over edge {}", edge );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/4e51d383/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/MarkedEdge.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/MarkedEdge.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/MarkedEdge.java
index 4b5eeaa..da6fedb 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/MarkedEdge.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/MarkedEdge.java
@@ -38,4 +38,17 @@ public interface MarkedEdge extends Edge{
      */
     boolean isDeleted();
 
+    /**
+     * Return true if the source node is deleted
+     * @return
+     */
+    boolean isSourceNodeDelete();
+
+    /**
+     * Return true if the target node is deleted
+     * @return
+     */
+    boolean isTargetNodeDeleted();
+
+
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/4e51d383/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
index c1e9cea..1bcb398 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
@@ -291,7 +291,7 @@ public class GraphManagerImpl implements GraphManager {
                     return storageEdgeSerialization.getEdgeVersions( scope, searchByEdge );
                 }
             } ).buffer( graphFig.getScanPageSize() )
-                      .compose( new EdgeBufferFilter( searchByEdge.getMaxTimestamp(), searchByEdge.filterMarked() ) );
+                      .compose( new EdgeBufferFilter( searchByEdge.filterMarked() ) );
 
         return ObservableTimer.time( edges, loadEdgesVersionsTimer );
     }
@@ -306,7 +306,7 @@ public class GraphManagerImpl implements GraphManager {
                     return storageEdgeSerialization.getEdgesFromSource( scope, search );
                 }
             } ).buffer( graphFig.getScanPageSize() )
-                      .compose( new EdgeBufferFilter( search.getMaxTimestamp(), search.filterMarked() ) );
+                      .compose( new EdgeBufferFilter( search.filterMarked() ) );
 
         return ObservableTimer.time( edges, loadEdgesFromSourceTimer );
     }
@@ -321,7 +321,7 @@ public class GraphManagerImpl implements GraphManager {
                     return storageEdgeSerialization.getEdgesToTarget( scope, search );
                 }
             } ).buffer( graphFig.getScanPageSize() )
-                      .compose( new EdgeBufferFilter( search.getMaxTimestamp(), search.filterMarked() ) );
+                      .compose( new EdgeBufferFilter( search.filterMarked() ) );
 
 
         return ObservableTimer.time( edges, loadEdgesToTargetTimer );
@@ -337,7 +337,7 @@ public class GraphManagerImpl implements GraphManager {
                     return storageEdgeSerialization.getEdgesFromSourceByTargetType( scope, search );
                 }
             } ).buffer( graphFig.getScanPageSize() )
-                      .compose( new EdgeBufferFilter( search.getMaxTimestamp(), search.filterMarked() ) );
+                      .compose( new EdgeBufferFilter( search.filterMarked() ) );
 
         return ObservableTimer.time( edges, loadEdgesFromSourceByTypeTimer );
     }
@@ -352,7 +352,7 @@ public class GraphManagerImpl implements GraphManager {
                     return storageEdgeSerialization.getEdgesToTargetBySourceType( scope, search );
                 }
             } ).buffer( graphFig.getScanPageSize() )
-                      .compose( new EdgeBufferFilter( search.getMaxTimestamp(), search.filterMarked() ) );
+                      .compose( new EdgeBufferFilter(  search.filterMarked() ) );
 
         return ObservableTimer.time( edges, loadEdgesToTargetByTypeTimer );
     }
@@ -420,12 +420,10 @@ public class GraphManagerImpl implements GraphManager {
         Observable.Transformer<List<MarkedEdge>, MarkedEdge> {//implements Func1<List<MarkedEdge>,
         // Observable<MarkedEdge>> {
 
-        private final long maxVersion;
         private final boolean filterMarked;
 
 
-        private EdgeBufferFilter( final long maxVersion, final boolean filterMarked ) {
-            this.maxVersion = maxVersion;
+        private EdgeBufferFilter( final boolean filterMarked ) {
             this.filterMarked = filterMarked;
         }
 
@@ -444,23 +442,16 @@ public class GraphManagerImpl implements GraphManager {
 
                 final Observable<MarkedEdge> markedEdgeObservable = Observable.from( markedEdges );
 
-                /**
-                 * We aren't going to filter anything, return exactly what we're passed
-                 */
-                if(!filterMarked){
-                    return markedEdgeObservable;
-                }
-
                 //We need to filter, perform that filter
                 final Map<Id, Long> markedVersions = nodeSerialization.getMaxVersions( scope, markedEdges );
 
-                return markedEdgeObservable.filter( edge -> {
-                    final long edgeTimestamp = edge.getTimestamp();
+                return markedEdgeObservable.map( edge -> {
 
-                    //our edge needs to not be deleted and have a version that's > max Version
-                    if ( edge.isDeleted() ) {
-                        return false;
-                    }
+                    /**
+                     * Make sure we mark source and target deleted nodes as such
+                     */
+
+                    final long edgeTimestamp = edge.getTimestamp();
 
 
                     final Long sourceTimestamp = markedVersions.get( edge.getSourceNode() );
@@ -468,22 +459,29 @@ public class GraphManagerImpl implements GraphManager {
                     //the source Id has been marked for deletion.  It's version is <= to the marked version for
                     // deletion,
                     // so we need to discard it
-                    if ( sourceTimestamp != null && Long.compare( edgeTimestamp, sourceTimestamp ) < 1 ) {
-                        return false;
-                    }
+                    final boolean isSourceDeleted =  ( sourceTimestamp != null && Long.compare( edgeTimestamp, sourceTimestamp ) < 1 );
 
                     final Long targetTimestamp = markedVersions.get( edge.getTargetNode() );
 
                     //the target Id has been marked for deletion.  It's version is <= to the marked version for
                     // deletion,
                     // so we need to discard it
-                    if ( targetTimestamp != null && Long.compare( edgeTimestamp, targetTimestamp ) < 1 ) {
-                        return false;
+                    final  boolean isTargetDeleted = ( targetTimestamp != null && Long.compare( edgeTimestamp, targetTimestamp ) < 1 );
+
+                    //one has been marked for deletion, return it
+                    if(isSourceDeleted || isTargetDeleted){
+                        return new SimpleMarkedEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(), edge.getTimestamp(), edge.isDeleted(), isSourceDeleted, isTargetDeleted );
                     }
 
+                    return edge;
+                } ).filter( simpleMarkedEdge -> {
+                    if(!filterMarked){
+                        return true;
+                    }
 
-                    return true;
-                } );
+                    //if any one of these is true, we filter it
+                    return !(simpleMarkedEdge.isDeleted() || simpleMarkedEdge.isSourceNodeDelete() || simpleMarkedEdge.isTargetNodeDeleted());
+                });
             } );
         }
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/4e51d383/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleMarkedEdge.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleMarkedEdge.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleMarkedEdge.java
index 29d90eb..c6dc2e4 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleMarkedEdge.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleMarkedEdge.java
@@ -34,12 +34,23 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
 public class SimpleMarkedEdge extends  SimpleEdge implements MarkedEdge {
 
     private final boolean deleted;
+    private final boolean isSourceNodeDeleted;
+    private final boolean isTargetNodeDeleted;
 
 
     public SimpleMarkedEdge( final Id sourceNode, final String type, final Id targetNode, final long timestamp, final boolean deleted) {
 
-        super(sourceNode, type, targetNode, timestamp);
+        this( sourceNode, type, targetNode, timestamp, deleted, false, false );
+    }
+
+
+    public SimpleMarkedEdge( final Id sourceNode, final String type, final Id targetNode, final long timestamp,
+                             final boolean deleted, final boolean isSourceNodeDeleted,
+                             final boolean isTargetNodeDeleted ) {
+        super( sourceNode, type, targetNode, timestamp );
         this.deleted = deleted;
+        this.isSourceNodeDeleted = isSourceNodeDeleted;
+        this.isTargetNodeDeleted = isTargetNodeDeleted;
     }
 
 
@@ -56,6 +67,18 @@ public class SimpleMarkedEdge extends  SimpleEdge implements MarkedEdge {
 
 
     @Override
+    public boolean isSourceNodeDelete() {
+        return isSourceNodeDeleted;
+    }
+
+
+    @Override
+    public boolean isTargetNodeDeleted() {
+        return isTargetNodeDeleted;
+    }
+
+
+    @Override
     public boolean equals( final Object o ) {
         if ( this == o ) {
             return true;
@@ -81,6 +104,8 @@ public class SimpleMarkedEdge extends  SimpleEdge implements MarkedEdge {
     public int hashCode() {
         int result = super.hashCode();
         result = 31 * result + ( deleted ? 1 : 0 );
+        result = 31 * result + ( isSourceNodeDeleted ? 1 : 0 );
+        result = 31 * result + ( isTargetNodeDeleted ? 1 : 0 );
         return result;
     }
 
@@ -88,8 +113,10 @@ public class SimpleMarkedEdge extends  SimpleEdge implements MarkedEdge {
     @Override
     public String toString() {
         return "SimpleMarkedEdge{" +
-                "deleted=" + deleted +
-                "} " + super.toString();
+            "deleted=" + deleted +
+            ", isSourceNodeDeleted=" + isSourceNodeDeleted +
+            ", isTargetNodeDeleted=" + isTargetNodeDeleted +
+            "} " + super.toString();
     }
 }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/4e51d383/stack/corepersistence/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/pom.xml b/stack/corepersistence/pom.xml
index 4b47bc0..4e4648e 100644
--- a/stack/corepersistence/pom.xml
+++ b/stack/corepersistence/pom.xml
@@ -70,7 +70,7 @@ limitations under the License.
         <junit.version>4.11</junit.version>
         <kryo-serializers.version>0.26</kryo-serializers.version>
         <log4j.version>1.2.17</log4j.version>
-        <rx.version>1.0.12</rx.version>
+        <rx.version>1.0.14</rx.version>
         <slf4j.version>1.7.2</slf4j.version>
         <surefire.version>2.16</surefire.version>
         <aws.version>1.10.6</aws.version>


[5/7] usergrid git commit: Merge branch 'USERGRID-1062' of https://git-wip-us.apache.org/repos/asf/usergrid into USERGRID-1052

Posted by mr...@apache.org.
Merge branch 'USERGRID-1062' of https://git-wip-us.apache.org/repos/asf/usergrid into USERGRID-1052


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/a770a6d7
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/a770a6d7
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/a770a6d7

Branch: refs/heads/2.1-release
Commit: a770a6d7b99389031fe957372cb867d6a06dfcb9
Parents: 1d4f22c 032ffd4
Author: Todd Nine <tn...@apigee.com>
Authored: Fri Oct 23 15:48:17 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri Oct 23 15:48:17 2015 -0600

----------------------------------------------------------------------
 .../asyncevents/AmazonAsyncEventService.java    |  7 +++---
 .../asyncevents/EventBuilder.java               | 25 +++++++++++++++-----
 .../asyncevents/EventBuilderImpl.java           |  9 +++----
 .../asyncevents/InMemoryAsyncEventService.java  |  5 ++--
 4 files changed, 30 insertions(+), 16 deletions(-)
----------------------------------------------------------------------



[4/7] usergrid git commit: Changed the interface to return MarkedEdge interface. This allows the caller to determine if they want to filter or not, as well as to perform read-repair actions on read.

Posted by mr...@apache.org.
Changed the interface to return MarkedEdge interface.  This allows the caller to determine if they want to filter or not, as well as to perform read-repair actions on read.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/1d4f22c5
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/1d4f22c5
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/1d4f22c5

Branch: refs/heads/2.1-release
Commit: 1d4f22c59c057a6f9d52844e03897786b0773a5b
Parents: 4e51d38
Author: Todd Nine <tn...@apigee.com>
Authored: Fri Oct 23 15:25:39 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri Oct 23 15:25:39 2015 -0600

----------------------------------------------------------------------
 .../corepersistence/CpEntityManagerFactory.java |  7 +-
 .../corepersistence/CpRelationManager.java      | 16 ++---
 .../read/traverse/AbstractReadGraphFilter.java  | 74 +++++++++++++++----
 .../read/traverse/EdgeCursorSerializer.java     |  8 ++-
 .../traverse/ReadGraphCollectionFilter.java     |  5 +-
 .../ReadGraphConnectionByTypeFilter.java        | 11 +--
 .../traverse/ReadGraphConnectionFilter.java     |  5 +-
 .../index/AsyncIndexServiceTest.java            |  3 +-
 .../corepersistence/index/IndexServiceTest.java | 12 ++--
 .../pipeline/cursor/CursorTest.java             | 24 +++----
 .../service/ConnectionServiceImplTest.java      |  5 +-
 .../persistence/ApplicationServiceIT.java       |  4 +-
 .../persistence/graph/GraphManager.java         | 14 ++--
 .../usergrid/persistence/graph/MarkedEdge.java  |  1 +
 .../graph/impl/GraphManagerImpl.java            | 30 ++++----
 .../graph/impl/SimpleMarkedEdge.java            | 39 ++++++----
 .../persistence/graph/GraphManagerIT.java       | 76 ++++++++++----------
 .../persistence/graph/GraphManagerLoadTest.java | 10 +--
 .../graph/GraphManagerShardConsistencyIT.java   |  6 +-
 .../graph/GraphManagerStressTest.java           | 16 ++---
 .../management/AppInfoMigrationPlugin.java      |  5 +-
 21 files changed, 220 insertions(+), 151 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index 4bdade5..c75a025 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -63,6 +63,7 @@ import org.apache.usergrid.persistence.exceptions.EntityNotFoundException;
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
 import org.apache.usergrid.persistence.graph.SearchByEdgeType;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
 import org.apache.usergrid.persistence.index.EntityIndex;
@@ -477,9 +478,9 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
         logger.debug("getApplications(): Loading edges of edgeType {} from {}:{}",
             new Object[]{edgeType, managementId.getType(), managementId.getUuid()});
 
-        Observable<Edge> edges = gm.loadEdgesFromSource( new SimpleSearchByEdgeType(
-                managementId, edgeType, Long.MAX_VALUE,
-                SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent() ));
+        Observable<MarkedEdge> edges = gm.loadEdgesFromSource(
+            new SimpleSearchByEdgeType( managementId, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
+                Optional.<Edge>absent() ) );
 
         final EntityCollectionManager ecm = managerCache.getEntityCollectionManager( appScope );
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index aad7610..b4cabc4 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -59,6 +59,7 @@ import org.apache.usergrid.persistence.entities.Group;
 import org.apache.usergrid.persistence.entities.User;
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
 import org.apache.usergrid.persistence.graph.SearchByEdge;
 import org.apache.usergrid.persistence.graph.SearchByEdgeType;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
@@ -227,14 +228,9 @@ public class CpRelationManager implements RelationManager {
 
         Observable<Edge> edges =
             gm.getEdgeTypesToTarget( new SimpleSearchEdgeType( cpHeadEntity.getId(), edgeType, null ) )
-              .flatMap( new Func1<String, Observable<Edge>>() {
-                  @Override
-                  public Observable<Edge> call( final String edgeType ) {
-                      return gm.loadEdgesToTarget(
-                          new SimpleSearchByEdgeType( cpHeadEntity.getId(), edgeType, Long.MAX_VALUE,
-                              SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent() ) );
-                  }
-              } );
+              .flatMap( edgeType1 -> gm.loadEdgesToTarget(
+                  new SimpleSearchByEdgeType( cpHeadEntity.getId(), edgeType1, Long.MAX_VALUE,
+                      SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent() ) ) );
 
         //if our limit is set, take them.  Note this logic is still borked, we can't possibly fit everything in memmory
         if ( limit > -1 ) {
@@ -268,7 +264,7 @@ public class CpRelationManager implements RelationManager {
         } );
 
         GraphManager gm = managerCache.getGraphManager( applicationScope );
-        Observable<Edge> edges = gm.loadEdgeVersions( CpNamingUtils
+        Observable<MarkedEdge> edges = gm.loadEdgeVersions( CpNamingUtils
             .createEdgeFromConnectionType( new SimpleId( headEntity.getUuid(), headEntity.getType() ), connectionType,
                 entityId ) );
 
@@ -288,7 +284,7 @@ public class CpRelationManager implements RelationManager {
         } );
 
         GraphManager gm = managerCache.getGraphManager( applicationScope );
-        Observable<Edge> edges = gm.loadEdgeVersions( CpNamingUtils
+        Observable<MarkedEdge> edges = gm.loadEdgeVersions( CpNamingUtils
             .createEdgeFromCollectionName( new SimpleId( headEntity.getUuid(), headEntity.getType() ), collectionName,
                 entityId ) );
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
index d3e0345..621edd2 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
@@ -23,13 +23,16 @@ package org.apache.usergrid.corepersistence.pipeline.read.traverse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
 import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
 import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter;
 import org.apache.usergrid.corepersistence.pipeline.read.EdgePath;
 import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
 import org.apache.usergrid.persistence.graph.SearchByEdgeType;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
 import org.apache.usergrid.persistence.model.entity.Id;
@@ -42,18 +45,21 @@ import rx.Observable;
 /**
  * Command for reading graph edges
  */
-public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id, Edge> {
+public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id, MarkedEdge> {
 
     private static final Logger logger = LoggerFactory.getLogger( AbstractReadGraphFilter.class );
 
     private final GraphManagerFactory graphManagerFactory;
+    private final AsyncEventService asyncEventService;
 
 
     /**
      * Create a new instance of our command
      */
-    public AbstractReadGraphFilter( final GraphManagerFactory graphManagerFactory ) {
+    public AbstractReadGraphFilter( final GraphManagerFactory graphManagerFactory,
+                                    final AsyncEventService asyncEventService ) {
         this.graphManagerFactory = graphManagerFactory;
+        this.asyncEventService = asyncEventService;
     }
 
 
@@ -61,9 +67,11 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
     public Observable<FilterResult<Id>> call( final Observable<FilterResult<Id>> previousIds ) {
 
 
+        final ApplicationScope applicationScope = pipelineContext.getApplicationScope();
+
         //get the graph manager
         final GraphManager graphManager =
-            graphManagerFactory.createEdgeManager( pipelineContext.getApplicationScope() );
+            graphManagerFactory.createEdgeManager( applicationScope );
 
 
         final String edgeName = getEdgeTypeName();
@@ -74,18 +82,60 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
         return previousIds.flatMap( previousFilterValue -> {
 
             //set our our constant state
-            final Optional<Edge> startFromCursor = getSeekValue();
+            final Optional<MarkedEdge> startFromCursor = getSeekValue();
             final Id id = previousFilterValue.getValue();
 
 
+            final Optional<Edge> typeWrapper = Optional.fromNullable(startFromCursor.orNull());
+
+            /**
+             * We do not want to filter.  This is intentional DO NOT REMOVE!!!
+             *
+             * We want to fire events on these edges if they exist, the delete was missed.
+             */
             final SimpleSearchByEdgeType search =
                 new SimpleSearchByEdgeType( id, edgeName, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
-                    startFromCursor );
+                    typeWrapper, false );
 
             /**
              * TODO, pass a message with pointers to our cursor values to be generated later
              */
-            return graphManager.loadEdgesFromSource( search )
+            return graphManager.loadEdgesFromSource( search ).filter(markedEdge -> {
+
+                final boolean isDeleted = markedEdge.isDeleted();
+                final boolean isSourceNodeDeleted = markedEdge.isSourceNodeDelete();
+                final boolean isTargetNodeDelete = markedEdge.isTargetNodeDeleted();
+
+
+
+                if(isDeleted){
+                    logger.trace( "Edge {} is deleted, queueing the delete event", markedEdge );
+                    asyncEventService.queueDeleteEdge( applicationScope, markedEdge  );
+                }
+
+                if(isSourceNodeDeleted){
+                    final Id sourceNodeId = markedEdge.getSourceNode();
+
+                    logger.trace( "Edge {} has a deleted source node, queueing the delete entity event for id {}", markedEdge, sourceNodeId );
+
+                    asyncEventService.queueEntityDelete( applicationScope, sourceNodeId );
+                }
+
+                if(isTargetNodeDelete){
+
+                    final Id targetNodeId = markedEdge.getTargetNode();
+
+                    logger.trace( "Edge {} has a deleted target node, queueing the delete entity event for id {}", markedEdge, targetNodeId );
+
+                    asyncEventService.queueEntityDelete( applicationScope, targetNodeId );
+                }
+
+
+                //filter if any of them are marked
+                return !isDeleted && !isSourceNodeDeleted && !isTargetNodeDelete;
+
+
+            })
                 //set the edge state for cursors
                 .doOnNext( edge -> {
                     logger.trace( "Seeking over edge {}", edge );
@@ -100,7 +150,7 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
 
 
     @Override
-    protected FilterResult<Id> createFilterResult( final Id emit, final Edge cursorValue,
+    protected FilterResult<Id> createFilterResult( final Id emit, final MarkedEdge cursorValue,
                                                    final Optional<EdgePath> parent ) {
 
         //if it's our first pass, there's no cursor to generate
@@ -113,7 +163,7 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
 
 
     @Override
-    protected CursorSerializer<Edge> getCursorSerializer() {
+    protected CursorSerializer<MarkedEdge> getCursorSerializer() {
         return EdgeCursorSerializer.INSTANCE;
     }
 
@@ -131,14 +181,14 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
      */
     private final class EdgeState {
 
-        private Edge cursorEdge = null;
-        private Edge currentEdge = null;
+        private MarkedEdge cursorEdge = null;
+        private MarkedEdge currentEdge = null;
 
 
         /**
          * Update the pointers
          */
-        private void update( final Edge newEdge ) {
+        private void update( final MarkedEdge newEdge ) {
             cursorEdge = currentEdge;
             currentEdge = newEdge;
         }
@@ -147,7 +197,7 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
         /**
          * Get the edge to use in cursors for resume
          */
-        private Edge getCursorEdge() {
+        private MarkedEdge getCursorEdge() {
             return cursorEdge;
         }
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EdgeCursorSerializer.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EdgeCursorSerializer.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EdgeCursorSerializer.java
index 8d9bf6f..d54e547 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EdgeCursorSerializer.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EdgeCursorSerializer.java
@@ -22,20 +22,22 @@ package org.apache.usergrid.corepersistence.pipeline.read.traverse;
 
 import org.apache.usergrid.corepersistence.pipeline.cursor.AbstractCursorSerializer;
 import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
 import org.apache.usergrid.persistence.graph.impl.SimpleEdge;
+import org.apache.usergrid.persistence.graph.impl.SimpleMarkedEdge;
 
 
 /**
  * Edge cursor serializer
  */
-public class EdgeCursorSerializer extends AbstractCursorSerializer<Edge> {
+public class EdgeCursorSerializer extends AbstractCursorSerializer<MarkedEdge> {
 
 
     public static final EdgeCursorSerializer INSTANCE = new EdgeCursorSerializer();
 
     @Override
-    protected Class<SimpleEdge> getType() {
-        return SimpleEdge.class;
+    protected Class<SimpleMarkedEdge> getType() {
+        return SimpleMarkedEdge.class;
     }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java
index dc39f5c..db5a0a8 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java
@@ -20,6 +20,7 @@
 package org.apache.usergrid.corepersistence.pipeline.read.traverse;
 
 
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
 
 import com.google.inject.Inject;
@@ -40,8 +41,8 @@ public class ReadGraphCollectionFilter extends AbstractReadGraphFilter {
      * Create a new instance of our command
      */
     @Inject
-    public ReadGraphCollectionFilter( final GraphManagerFactory graphManagerFactory, @Assisted final String collectionName ) {
-        super( graphManagerFactory );
+    public ReadGraphCollectionFilter( final GraphManagerFactory graphManagerFactory, final AsyncEventService asyncEventService, @Assisted final String collectionName ) {
+        super( graphManagerFactory, asyncEventService );
         this.collectionName = collectionName;
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionByTypeFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionByTypeFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionByTypeFilter.java
index 61ba4ad..054a52b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionByTypeFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionByTypeFilter.java
@@ -26,6 +26,7 @@ import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
 import org.apache.usergrid.persistence.graph.SearchByEdgeType;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchByIdType;
 import org.apache.usergrid.persistence.model.entity.Id;
@@ -42,7 +43,7 @@ import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getEdgeType
 /**
  * Command for reading graph edges on a connection
  */
-public class ReadGraphConnectionByTypeFilter extends AbstractPathFilter<Id, Id, Edge>{
+public class ReadGraphConnectionByTypeFilter extends AbstractPathFilter<Id, Id, MarkedEdge>{
 
     private final GraphManagerFactory graphManagerFactory;
     private final String connectionName;
@@ -77,12 +78,14 @@ public class ReadGraphConnectionByTypeFilter extends AbstractPathFilter<Id, Id,
         return filterResultObservable.flatMap( idFilterResult -> {
 
               //set our our constant state
-            final Optional<Edge> startFromCursor = getSeekValue();
+            final Optional<MarkedEdge> startFromCursor = getSeekValue();
             final Id id = idFilterResult.getValue();
 
+            final Optional<Edge> typeWrapper = Optional.fromNullable(startFromCursor.orNull());
+
             final SimpleSearchByIdType search =
                 new SimpleSearchByIdType( id, edgeName, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
-                    entityType, startFromCursor );
+                    entityType, typeWrapper );
 
             return graphManager.loadEdgesFromSourceByType( search ).map(
                 edge -> createFilterResult( edge.getTargetNode(), edge, idFilterResult.getPath() ));
@@ -91,7 +94,7 @@ public class ReadGraphConnectionByTypeFilter extends AbstractPathFilter<Id, Id,
 
 
     @Override
-    protected CursorSerializer<Edge> getCursorSerializer() {
+    protected CursorSerializer<MarkedEdge> getCursorSerializer() {
         return EdgeCursorSerializer.INSTANCE;
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java
index 11ec5f8..93e8fd4 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java
@@ -20,6 +20,7 @@
 package org.apache.usergrid.corepersistence.pipeline.read.traverse;
 
 
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
 
 import com.google.inject.Inject;
@@ -40,8 +41,8 @@ public class ReadGraphConnectionFilter extends AbstractReadGraphFilter {
      * Create a new instance of our command
      */
     @Inject
-    public ReadGraphConnectionFilter( final GraphManagerFactory graphManagerFactory, @Assisted final String connectionName ) {
-        super( graphManagerFactory );
+    public ReadGraphConnectionFilter( final GraphManagerFactory graphManagerFactory,  final AsyncEventService asyncEventService,  @Assisted final String connectionName ) {
+        super( graphManagerFactory, asyncEventService );
         this.connectionName = connectionName;
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java
index 2863cbf..74f9ce0 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.UUID;
 
 import org.apache.usergrid.persistence.EntityManagerFactory;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
 import org.apache.usergrid.persistence.index.*;
 import org.junit.Before;
 import org.junit.Rule;
@@ -135,7 +136,7 @@ public abstract class AsyncIndexServiceTest {
          */
 
 
-        final List<Edge> connectionSearchEdges = Observable.range( 0, 500 ).flatMap(integer -> {
+        final List<MarkedEdge> connectionSearchEdges = Observable.range( 0, 500 ).flatMap(integer -> {
             final Id connectingId = createId("connecting");
             final Edge edge = CpNamingUtils.createConnectionEdge(connectingId, "likes", testEntity.getId());
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java
index 6001dd4..90d6c5a 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.UUID;
 
+import org.apache.usergrid.persistence.graph.MarkedEdge;
 import org.apache.usergrid.persistence.index.impl.IndexProducer;
 import org.junit.Before;
 import org.junit.Test;
@@ -244,7 +245,7 @@ public class IndexServiceTest {
 //        final int edgeCount = indexFig.getIndexBatchSize()*2;
         final int edgeCount = 100;
 
-        final List<Edge> connectionSearchEdges = Observable.range( 0, edgeCount ).flatMap( integer -> {
+        final List<MarkedEdge> connectionSearchEdges = Observable.range( 0, edgeCount ).flatMap( integer -> {
             final Id connectingId = createId( "connecting" );
             final Edge edge = CpNamingUtils.createConnectionEdge( connectingId, "likes", testEntity.getId() );
 
@@ -377,7 +378,8 @@ public class IndexServiceTest {
         //Write multiple connection edges
         final int edgeCount = 5;
 
-        final List<Edge> connectionSearchEdges = createConnectionSearchEdges( testEntity, graphManager, edgeCount );
+        final List<MarkedEdge>
+            connectionSearchEdges = createConnectionSearchEdges( testEntity, graphManager, edgeCount );
 
         indexService.indexEntity( applicationScope, testEntity ).flatMap(mesage -> indexProducer.put(mesage)).toBlocking().getIterator();
 
@@ -485,10 +487,10 @@ public class IndexServiceTest {
     }
 
 
-    private List<Edge> createConnectionSearchEdges(
-        final Entity testEntity, final GraphManager graphManager, final int edgeCount ) {
+    private List<MarkedEdge> createConnectionSearchEdges( final Entity testEntity, final GraphManager graphManager,
+                                                          final int edgeCount ) {
 
-        final List<Edge> connectionSearchEdges = Observable.range( 0, edgeCount ).flatMap( integer -> {
+        final List<MarkedEdge> connectionSearchEdges = Observable.range( 0, edgeCount ).flatMap( integer -> {
 
             //create our connection edge.
             final Id connectingId = createId( "connecting" );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java
index 7128dcf..c9dcbf1 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java
@@ -28,7 +28,9 @@ import org.apache.usergrid.corepersistence.pipeline.read.EdgePath;
 import org.apache.usergrid.corepersistence.pipeline.read.search.ElasticsearchCursorSerializer;
 import org.apache.usergrid.corepersistence.pipeline.read.traverse.EdgeCursorSerializer;
 import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
 import org.apache.usergrid.persistence.graph.impl.SimpleEdge;
+import org.apache.usergrid.persistence.graph.impl.SimpleMarkedEdge;
 
 import com.google.common.base.Optional;
 
@@ -40,19 +42,12 @@ public class CursorTest {
 
     @Test
     public void testCursors(){
+         //test encoding edge
 
+        final MarkedEdge edge1 = new SimpleMarkedEdge( createId("source1"), "edgeType1",  createId("target1"), 100, false, false, false  );
 
 
-
-
-
-
-        //test encoding edge
-
-        final Edge edge1 = new SimpleEdge( createId("source1"), "edgeType1",  createId("target1"), 100  );
-
-
-        final Edge edge2 = new SimpleEdge( createId("source2"), "edgeType2",  createId("target2"), 110  );
+        final MarkedEdge edge2 = new SimpleMarkedEdge( createId("source2"), "edgeType2",  createId("target2"), 110, false, false, false  );
 
 
 
@@ -64,11 +59,12 @@ public class CursorTest {
 
         final EdgePath<Integer> filter3Path = new EdgePath<>( 3, query2, ElasticsearchCursorSerializer.INSTANCE, Optional.absent() );
 
-        final EdgePath<Edge> filter2Path = new EdgePath<Edge>(2, edge2, EdgeCursorSerializer.INSTANCE, Optional.of( filter3Path ));
+        final EdgePath<MarkedEdge> filter2Path =
+            new EdgePath<>( 2, edge2, EdgeCursorSerializer.INSTANCE, Optional.of( filter3Path ) );
 
         final EdgePath<Integer> filter1Path = new EdgePath<>( 1, query1, ElasticsearchCursorSerializer.INSTANCE, Optional.of(filter2Path) );
 
-        final EdgePath<Edge> filter0Path = new EdgePath<>( 0, edge1, EdgeCursorSerializer.INSTANCE, Optional.of( filter1Path ) );
+        final EdgePath<MarkedEdge> filter0Path = new EdgePath<>( 0, edge1, EdgeCursorSerializer.INSTANCE, Optional.of( filter1Path ) );
 
 
 
@@ -91,7 +87,7 @@ public class CursorTest {
 
         assertEquals(query2, parsedQuery2);
 
-        final Edge parsedEdge2 = requestCursor.getCursor( 2, EdgeCursorSerializer.INSTANCE );
+        final MarkedEdge parsedEdge2 = requestCursor.getCursor( 2, EdgeCursorSerializer.INSTANCE );
 
         assertEquals( edge2, parsedEdge2 );
 
@@ -100,7 +96,7 @@ public class CursorTest {
         assertEquals( query1, parsedQuery1 );
 
 
-        final Edge parsedEdge1 = requestCursor.getCursor( 0, EdgeCursorSerializer.INSTANCE );
+        final MarkedEdge parsedEdge1 = requestCursor.getCursor( 0, EdgeCursorSerializer.INSTANCE );
 
         assertEquals(edge1, parsedEdge1);
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/core/src/test/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImplTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImplTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImplTest.java
index 326e128..6929d87 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImplTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImplTest.java
@@ -36,6 +36,7 @@ import org.apache.usergrid.persistence.core.test.UseModules;
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
 import org.apache.usergrid.persistence.graph.SearchByEdge;
 import org.apache.usergrid.persistence.graph.SearchByEdgeType;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
@@ -118,7 +119,7 @@ public class ConnectionServiceImplTest {
             new SimpleSearchByEdge( source, connectionEdge.getType(), target, Long.MAX_VALUE,
                 SearchByEdgeType.Order.DESCENDING, Optional.absent() );
 
-        final List<Edge> edges = gm.loadEdgeVersions( simpleSearchByEdge ).toList().toBlocking().last();
+        final List<MarkedEdge> edges = gm.loadEdgeVersions( simpleSearchByEdge ).toList().toBlocking().last();
 
         assertEquals( 1, edges.size() );
 
@@ -209,7 +210,7 @@ public class ConnectionServiceImplTest {
                 SearchByEdgeType.Order.DESCENDING, Optional.absent() );
 
         //check only 1 exists
-        final List<Edge> edges = gm.loadEdgeVersions( simpleSearchByEdge ).toList().toBlocking().last();
+        final List<MarkedEdge> edges = gm.loadEdgeVersions( simpleSearchByEdge ).toList().toBlocking().last();
 
         assertEquals( 1, edges.size() );
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/core/src/test/java/org/apache/usergrid/persistence/ApplicationServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/ApplicationServiceIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/ApplicationServiceIT.java
index d870114..658d3eb 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/ApplicationServiceIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/ApplicationServiceIT.java
@@ -30,6 +30,7 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
 import org.apache.usergrid.persistence.graph.SearchByEdgeType;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
@@ -86,7 +87,8 @@ public class ApplicationServiceIT extends AbstractCoreIT {
             , Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
             Optional.<Edge>absent() );
 
-        Iterator<Edge> results = graphManager.loadEdgesFromSource(simpleSearchByEdgeType).toBlocking().getIterator();
+        Iterator<MarkedEdge>
+            results = graphManager.loadEdgesFromSource(simpleSearchByEdgeType).toBlocking().getIterator();
         if(results.hasNext()){
             Assert.fail("should be empty");
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java
index 6100725..000c633 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java
@@ -59,7 +59,7 @@ public interface GraphManager extends CPManager {
      * Create or update an edge.  Note that the implementation should also create incoming (reversed) edges for this
      * edge.
      */
-    Observable<Edge> writeEdge( Edge edge );
+    Observable<MarkedEdge> writeEdge( Edge edge );
 
 
     /**
@@ -68,7 +68,7 @@ public interface GraphManager extends CPManager {
      *
      * Implementation should also mark the incoming (reversed) edge. Only marks the specific version
      */
-    Observable<Edge> markEdge( Edge edge );
+    Observable<MarkedEdge> markEdge( Edge edge );
 
     /**
      * @param edge Remove the edge in the graph
@@ -98,7 +98,7 @@ public interface GraphManager extends CPManager {
     /**
      * Get all versions of this edge where versions <= max version
      */
-    Observable<Edge> loadEdgeVersions( SearchByEdge edge );
+    Observable<MarkedEdge> loadEdgeVersions( SearchByEdge edge );
 
     /**
      * Returns an observable that emits all edges where the specified node is the source node. The edges will match the
@@ -108,7 +108,7 @@ public interface GraphManager extends CPManager {
      *
      * @return An observable that emits Edges. The observer will need to unsubscribe when it has completed consumption.
      */
-    Observable<Edge> loadEdgesFromSource( SearchByEdgeType search );
+    Observable<MarkedEdge> loadEdgesFromSource( SearchByEdgeType search );
 
     /**
      * Returns an observable that emits all edges where the specified node is the target node. The edges will match the
@@ -118,7 +118,7 @@ public interface GraphManager extends CPManager {
      *
      * @return An observable that emits Edges. The observer will need to unsubscribe when it has completed consumption.
      */
-    Observable<Edge> loadEdgesToTarget( SearchByEdgeType search );
+    Observable<MarkedEdge> loadEdgesToTarget( SearchByEdgeType search );
 
 
     /**
@@ -129,7 +129,7 @@ public interface GraphManager extends CPManager {
      *
      * @return An observable that emits Edges. The observer will need to unsubscribe when it has completed consumption.
      */
-    Observable<Edge> loadEdgesFromSourceByType( SearchByIdType search );
+    Observable<MarkedEdge> loadEdgesFromSourceByType( SearchByIdType search );
 
 
     /**
@@ -140,7 +140,7 @@ public interface GraphManager extends CPManager {
      *
      * @return An observable that emits Edges. The observer will need to unsubscribe when it has completed consumption.
      */
-    Observable<Edge> loadEdgesToTargetByType( SearchByIdType search );
+    Observable<MarkedEdge> loadEdgesToTargetByType( SearchByIdType search );
 
     /**
      * Get all edge types to this node.  The node provided by search is the target node.

http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/MarkedEdge.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/MarkedEdge.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/MarkedEdge.java
index da6fedb..88809e2 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/MarkedEdge.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/MarkedEdge.java
@@ -30,6 +30,7 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
  * An edge.  With the additional info of if it is marked for deletion
  *
  */
+@JsonDeserialize(as = SimpleMarkedEdge.class)
 public interface MarkedEdge extends Edge{
 
     /**

http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
index 1bcb398..93ae753 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
@@ -144,12 +144,12 @@ public class GraphManagerImpl implements GraphManager {
 
 
     @Override
-    public Observable<Edge> writeEdge( final Edge edge ) {
+    public Observable<MarkedEdge> writeEdge( final Edge edge ) {
         GraphValidation.validateEdge( edge );
 
         final MarkedEdge markedEdge = new SimpleMarkedEdge( edge, false );
 
-        final Observable<Edge> observable = Observable.just( markedEdge ).map( edge1 -> {
+        final Observable<MarkedEdge> observable = Observable.just( markedEdge ).map( edge1 -> {
 
             final UUID timestamp = UUIDGenerator.newTimeUUID();
 
@@ -175,12 +175,12 @@ public class GraphManagerImpl implements GraphManager {
 
 
     @Override
-    public Observable<Edge> markEdge( final Edge edge ) {
+    public Observable<MarkedEdge> markEdge( final Edge edge ) {
         GraphValidation.validateEdge( edge );
 
         final MarkedEdge markedEdge = new SimpleMarkedEdge( edge, true );
 
-        final Observable<Edge> observable = Observable.just( markedEdge ).map( edge1 -> {
+        final Observable<MarkedEdge> observable = Observable.just( markedEdge ).map( edge1 -> {
 
             final UUID timestamp = UUIDGenerator.newTimeUUID();
 
@@ -282,9 +282,9 @@ public class GraphManagerImpl implements GraphManager {
 
 
     @Override
-    public Observable<Edge> loadEdgeVersions( final SearchByEdge searchByEdge ) {
+    public Observable<MarkedEdge> loadEdgeVersions( final SearchByEdge searchByEdge ) {
 
-        final Observable<Edge> edges =
+        final Observable<MarkedEdge> edges =
             Observable.create( new ObservableIterator<MarkedEdge>( "getEdgeTypesFromSource" ) {
                 @Override
                 protected Iterator<MarkedEdge> getIterator() {
@@ -298,8 +298,8 @@ public class GraphManagerImpl implements GraphManager {
 
 
     @Override
-    public Observable<Edge> loadEdgesFromSource( final SearchByEdgeType search ) {
-        final Observable<Edge> edges =
+    public Observable<MarkedEdge> loadEdgesFromSource( final SearchByEdgeType search ) {
+        final Observable<MarkedEdge> edges =
             Observable.create( new ObservableIterator<MarkedEdge>( "loadEdgesFromSource" ) {
                 @Override
                 protected Iterator<MarkedEdge> getIterator() {
@@ -313,8 +313,8 @@ public class GraphManagerImpl implements GraphManager {
 
 
     @Override
-    public Observable<Edge> loadEdgesToTarget( final SearchByEdgeType search ) {
-        final Observable<Edge> edges =
+    public Observable<MarkedEdge> loadEdgesToTarget( final SearchByEdgeType search ) {
+        final Observable<MarkedEdge> edges =
             Observable.create( new ObservableIterator<MarkedEdge>( "loadEdgesToTarget" ) {
                 @Override
                 protected Iterator<MarkedEdge> getIterator() {
@@ -329,8 +329,8 @@ public class GraphManagerImpl implements GraphManager {
 
 
     @Override
-    public Observable<Edge> loadEdgesFromSourceByType( final SearchByIdType search ) {
-        final Observable<Edge> edges =
+    public Observable<MarkedEdge> loadEdgesFromSourceByType( final SearchByIdType search ) {
+        final Observable<MarkedEdge> edges =
             Observable.create( new ObservableIterator<MarkedEdge>( "loadEdgesFromSourceByType" ) {
                 @Override
                 protected Iterator<MarkedEdge> getIterator() {
@@ -344,8 +344,8 @@ public class GraphManagerImpl implements GraphManager {
 
 
     @Override
-    public Observable<Edge> loadEdgesToTargetByType( final SearchByIdType search ) {
-        final Observable<Edge> edges =
+    public Observable<MarkedEdge> loadEdgesToTargetByType( final SearchByIdType search ) {
+        final Observable<MarkedEdge> edges =
             Observable.create( new ObservableIterator<MarkedEdge>( "loadEdgesToTargetByType" ) {
                 @Override
                 protected Iterator<MarkedEdge> getIterator() {
@@ -480,7 +480,7 @@ public class GraphManagerImpl implements GraphManager {
                     }
 
                     //if any one of these is true, we filter it
-                    return !(simpleMarkedEdge.isDeleted() || simpleMarkedEdge.isSourceNodeDelete() || simpleMarkedEdge.isTargetNodeDeleted());
+                    return !simpleMarkedEdge.isDeleted() &&  !simpleMarkedEdge.isSourceNodeDelete() && !simpleMarkedEdge.isTargetNodeDeleted();
                 });
             } );
         }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleMarkedEdge.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleMarkedEdge.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleMarkedEdge.java
index c6dc2e4..9c35e2e 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleMarkedEdge.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleMarkedEdge.java
@@ -29,50 +29,61 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
 
 /**
  * Simple bean to represent our edge
+ *
  * @author tnine
  */
-public class SimpleMarkedEdge extends  SimpleEdge implements MarkedEdge {
+public class SimpleMarkedEdge extends SimpleEdge implements MarkedEdge {
+
+    private boolean isDeleted;
+    private boolean isSourceNodeDeleted;
+    private boolean isTargetNodeDeleted;
 
-    private final boolean deleted;
-    private final boolean isSourceNodeDeleted;
-    private final boolean isTargetNodeDeleted;
 
+    /**
+     * Unused but required for Jackson
+     */
+    public SimpleMarkedEdge() {
+    }
 
-    public SimpleMarkedEdge( final Id sourceNode, final String type, final Id targetNode, final long timestamp, final boolean deleted) {
 
-        this( sourceNode, type, targetNode, timestamp, deleted, false, false );
+    public SimpleMarkedEdge( final Id sourceNode, final String type, final Id targetNode, final long timestamp,
+                             final boolean isDeleted ) {
+
+        this( sourceNode, type, targetNode, timestamp, isDeleted, false, false );
     }
 
 
     public SimpleMarkedEdge( final Id sourceNode, final String type, final Id targetNode, final long timestamp,
-                             final boolean deleted, final boolean isSourceNodeDeleted,
+                             final boolean isDeleted, final boolean isSourceNodeDeleted,
                              final boolean isTargetNodeDeleted ) {
         super( sourceNode, type, targetNode, timestamp );
-        this.deleted = deleted;
+        this.isDeleted = isDeleted;
         this.isSourceNodeDeleted = isSourceNodeDeleted;
         this.isTargetNodeDeleted = isTargetNodeDeleted;
     }
 
 
-    public SimpleMarkedEdge(final Edge edge, final boolean deleted){
-        this(edge.getSourceNode(), edge.getType(), edge.getTargetNode(), edge.getTimestamp(), deleted);
+    public SimpleMarkedEdge( final Edge edge, final boolean isDeleted ) {
+        this( edge.getSourceNode(), edge.getType(), edge.getTargetNode(), edge.getTimestamp(), isDeleted );
     }
 
 
     @Override
     @JsonIgnore
     public boolean isDeleted() {
-        return deleted;
+        return isDeleted;
     }
 
 
     @Override
+    @JsonIgnore
     public boolean isSourceNodeDelete() {
         return isSourceNodeDeleted;
     }
 
 
     @Override
+    @JsonIgnore
     public boolean isTargetNodeDeleted() {
         return isTargetNodeDeleted;
     }
@@ -92,7 +103,7 @@ public class SimpleMarkedEdge extends  SimpleEdge implements MarkedEdge {
 
         final SimpleMarkedEdge that = ( SimpleMarkedEdge ) o;
 
-        if ( deleted != that.deleted ) {
+        if ( isDeleted != that.isDeleted ) {
             return false;
         }
 
@@ -103,7 +114,7 @@ public class SimpleMarkedEdge extends  SimpleEdge implements MarkedEdge {
     @Override
     public int hashCode() {
         int result = super.hashCode();
-        result = 31 * result + ( deleted ? 1 : 0 );
+        result = 31 * result + ( isDeleted ? 1 : 0 );
         result = 31 * result + ( isSourceNodeDeleted ? 1 : 0 );
         result = 31 * result + ( isTargetNodeDeleted ? 1 : 0 );
         return result;
@@ -113,7 +124,7 @@ public class SimpleMarkedEdge extends  SimpleEdge implements MarkedEdge {
     @Override
     public String toString() {
         return "SimpleMarkedEdge{" +
-            "deleted=" + deleted +
+            "deleted=" + isDeleted +
             ", isSourceNodeDeleted=" + isSourceNodeDeleted +
             ", isTargetNodeDeleted=" + isTargetNodeDeleted +
             "} " + super.toString();

http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java
index eda3a02..9a95c26 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java
@@ -95,7 +95,7 @@ public class GraphManagerIT {
 
         SearchByEdgeType search = createSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTimestamp(), null );
 
-        Observable<Edge> edges = gm.loadEdgesFromSource( search );
+        Observable<MarkedEdge> edges = gm.loadEdgesFromSource( search );
 
         //implicitly blows up if more than 1 is returned from "single"
         Edge returned = edges.toBlocking().last();
@@ -127,7 +127,7 @@ public class GraphManagerIT {
 
         SearchByEdgeType search = createSearchByEdge( edge.getTargetNode(), edge.getType(), edge.getTimestamp(), null );
 
-        Observable<Edge> edges = gm.loadEdgesToTarget( search );
+        Observable<MarkedEdge> edges = gm.loadEdgesToTarget( search );
 
         //implicitly blows up if more than 1 is returned from "single"
         Edge returned = edges.toBlocking().single();
@@ -161,7 +161,7 @@ public class GraphManagerIT {
 
         SearchByEdgeType search = createSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTimestamp(), null );
 
-        Observable<Edge> edges = gm.loadEdgesFromSource( search );
+        Observable<MarkedEdge> edges = gm.loadEdgesFromSource( search );
 
         //implicitly blows up if more than 1 is returned from "single"
         Edge returned = edges.toBlocking().single();
@@ -196,7 +196,7 @@ public class GraphManagerIT {
 
         SearchByEdgeType search = createSearchByEdge( edge.getTargetNode(), edge.getType(), edge.getTimestamp(), null );
 
-        Observable<Edge> edges = gm.loadEdgesToTarget( search );
+        Observable<MarkedEdge> edges = gm.loadEdgesToTarget( search );
 
         //implicitly blows up if more than 1 is returned from "single"
         Edge returned = edges.toBlocking().single();
@@ -248,10 +248,10 @@ public class GraphManagerIT {
         SearchByEdgeType search =
             createSearchByEdge( edge1.getSourceNode(), edge1.getType(), edge3.getTimestamp(), null );
 
-        Observable<Edge> edges = gm.loadEdgesFromSource( search );
+        Observable<MarkedEdge> edges = gm.loadEdgesFromSource( search );
 
         //implicitly blows up if more than 1 is returned from "single"
-        Iterator<Edge> returned = edges.toBlocking().getIterator();
+        Iterator<MarkedEdge> returned = edges.toBlocking().getIterator();
 
         assertEquals( "Correct edge returned", edge3, returned.next() );
         assertEquals( "Correct edge returned", edge2, returned.next() );
@@ -321,10 +321,10 @@ public class GraphManagerIT {
         SearchByEdgeType search =
             createSearchByEdge( edge1.getTargetNode(), edge1.getType(), edge3.getTimestamp(), null );
 
-        Observable<Edge> edges = gm.loadEdgesToTarget( search );
+        Observable<MarkedEdge> edges = gm.loadEdgesToTarget( search );
 
         //implicitly blows up if more than 1 is returned from "single"
-        Iterator<Edge> returned = edges.toBlocking().getIterator();
+        Iterator<MarkedEdge> returned = edges.toBlocking().getIterator();
 
         assertEquals( "Correct edge returned", edge3, returned.next() );
         assertEquals( "Correct edge returned", edge2, returned.next() );
@@ -387,10 +387,10 @@ public class GraphManagerIT {
         SearchByEdgeType search =
             createSearchByEdge( edge1.getSourceNode(), edge1.getType(), edge3.getTimestamp(), null );
 
-        Observable<Edge> edges = gm.loadEdgesFromSource( search );
+        Observable<MarkedEdge> edges = gm.loadEdgesFromSource( search );
 
         //implicitly blows up if more than 1 is returned from "single"
-        Iterator<Edge> returned = edges.toBlocking().getIterator();
+        Iterator<MarkedEdge> returned = edges.toBlocking().getIterator();
 
 
         //we have 3 edges, but we specified our first edge as the max, we shouldn't get any more results than the first
@@ -443,10 +443,10 @@ public class GraphManagerIT {
         SearchByEdgeType search =
             createSearchByEdge( edge1.getTargetNode(), edge1.getType(), edge3.getTimestamp(), null );
 
-        Observable<Edge> edges = gm.loadEdgesToTarget( search );
+        Observable<MarkedEdge> edges = gm.loadEdgesToTarget( search );
 
         //implicitly blows up if more than 1 is returned from "single"
-        Iterator<Edge> returned = edges.toBlocking().getIterator();
+        Iterator<MarkedEdge> returned = edges.toBlocking().getIterator();
 
 
         //we have 3 edges, but we specified our first edge as the max, we shouldn't get any more results than the first
@@ -487,7 +487,7 @@ public class GraphManagerIT {
         SearchByIdType search = createSearchByEdgeAndId( edge.getSourceNode(), edge.getType(), edge.getTimestamp(),
             edge.getTargetNode().getType(), null );
 
-        Observable<Edge> edges = gm.loadEdgesFromSourceByType( search );
+        Observable<MarkedEdge> edges = gm.loadEdgesFromSourceByType( search );
 
         //implicitly blows up if more than 1 is returned from "single"
         Edge returned = edges.toBlocking().single();
@@ -524,7 +524,7 @@ public class GraphManagerIT {
         SearchByIdType search = createSearchByEdgeAndId( edge.getTargetNode(), edge.getType(), edge.getTimestamp(),
             edge.getSourceNode().getType(), null );
 
-        Observable<Edge> edges = gm.loadEdgesToTargetByType( search );
+        Observable<MarkedEdge> edges = gm.loadEdgesToTargetByType( search );
 
         //implicitly blows up if more than 1 is returned from "single"
         Edge returned = edges.toBlocking().single();
@@ -560,7 +560,7 @@ public class GraphManagerIT {
 
         SearchByEdgeType search = createSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTimestamp(), null );
 
-        Observable<Edge> edges = gm.loadEdgesFromSource( search );
+        Observable<MarkedEdge> edges = gm.loadEdgesFromSource( search );
 
         //implicitly blows up if more than 1 is returned from "single"
         Edge returned = edges.toBlocking().single();
@@ -629,7 +629,7 @@ public class GraphManagerIT {
 
         SearchByEdgeType search = createSearchByEdge( edge.getTargetNode(), edge.getType(), edge.getTimestamp(), null );
 
-        Observable<Edge> edges = gm.loadEdgesToTarget( search );
+        Observable<MarkedEdge> edges = gm.loadEdgesToTarget( search );
 
         //implicitly blows up if more than 1 is returned from "single"
         Edge returned = edges.toBlocking().single();
@@ -988,11 +988,11 @@ public class GraphManagerIT {
 
 
         //get our 2 edges
-        Observable<Edge> edges =
+        Observable<MarkedEdge> edges =
             gm.loadEdgesFromSource( createSearchByEdge( edge1.getSourceNode(), edge1.getType(), maxVersion, null ) );
 
 
-        Iterator<Edge> results = edges.toBlocking().getIterator();
+        Iterator<MarkedEdge> results = edges.toBlocking().getIterator();
 
 
         assertEquals( "Edges correct", edge2, results.next() );
@@ -1060,11 +1060,11 @@ public class GraphManagerIT {
 
 
         //get our 2 edges
-        Observable<Edge> edges =
+        Observable<MarkedEdge> edges =
             gm.loadEdgesToTarget( createSearchByEdge( edge1.getTargetNode(), edge1.getType(), maxVersion, null ) );
 
 
-        Iterator<Edge> results = edges.toBlocking().getIterator();
+        Iterator<MarkedEdge> results = edges.toBlocking().getIterator();
 
 
         assertEquals( "Edges correct", edge2, results.next() );
@@ -1140,11 +1140,11 @@ public class GraphManagerIT {
 
 
         //get our 2 edges
-        Observable<Edge> edges = gm.loadEdgesFromSource(
+        Observable<MarkedEdge> edges = gm.loadEdgesFromSource(
             createSearchByEdgeUnfiltered( edge1.getSourceNode(), edge1.getType(), maxVersion, null ) );
 
 
-        Iterator<Edge> results = edges.toBlocking().getIterator();
+        Iterator<MarkedEdge> results = edges.toBlocking().getIterator();
 
 
         assertEquals( "Edges correct", edge2.getTargetNode(), results.next().getTargetNode() );
@@ -1211,11 +1211,11 @@ public class GraphManagerIT {
         gm.markEdge( edge1 ).toBlocking().last();
 
         //get our 2 edges
-        Observable<Edge> edges = gm.loadEdgesToTarget(
+        Observable<MarkedEdge> edges = gm.loadEdgesToTarget(
             createSearchByEdgeUnfiltered( edge1.getTargetNode(), edge1.getType(), maxVersion, null ) );
 
 
-        Iterator<Edge> results = edges.toBlocking().getIterator();
+        Iterator<MarkedEdge> results = edges.toBlocking().getIterator();
 
 
         assertEquals( "Edges correct", edge2.getSourceNode(), results.next().getSourceNode() );
@@ -1282,11 +1282,11 @@ public class GraphManagerIT {
 
 
         //get our 2 edges
-        Observable<Edge> edges = gm.loadEdgesFromSourceByType(
+        Observable<MarkedEdge> edges = gm.loadEdgesFromSourceByType(
             createSearchByEdgeAndId( sourceId, edge1.getType(), maxVersion, targetId1.getType(), null ) );
 
 
-        Iterator<Edge> results = edges.toBlocking().getIterator();
+        Iterator<MarkedEdge> results = edges.toBlocking().getIterator();
 
 
         assertEquals( "Edges correct", edge1, results.next() );
@@ -1359,11 +1359,11 @@ public class GraphManagerIT {
         final long maxVersion = System.currentTimeMillis();
 
         //get our 2 edges
-        Observable<Edge> edges = gm.loadEdgesToTargetByType(
+        Observable<MarkedEdge> edges = gm.loadEdgesToTargetByType(
             createSearchByEdgeAndId( targetId, edge1.getType(), maxVersion, sourceId1.getType(), null ) );
 
 
-        Iterator<Edge> results = edges.toBlocking().getIterator();
+        Iterator<MarkedEdge> results = edges.toBlocking().getIterator();
 
 
         assertEquals( "Edges correct", edge1, results.next() );
@@ -1435,7 +1435,7 @@ public class GraphManagerIT {
 
         final long maxVersion = System.currentTimeMillis();
 
-        Iterator<Edge> results =
+        Iterator<MarkedEdge> results =
             gm.loadEdgesFromSource( createSearchByEdge( sourceId, edge1.getType(), maxVersion, null ) ).toBlocking()
               .getIterator();
 
@@ -1547,7 +1547,7 @@ public class GraphManagerIT {
 
         final long maxVersion = System.currentTimeMillis();
 
-        Iterator<Edge> results =
+        Iterator<MarkedEdge> results =
             gm.loadEdgesFromSource( createSearchByEdge( sourceId, edge1.getType(), maxVersion, null ) ).toBlocking()
               .getIterator();
 
@@ -1627,7 +1627,7 @@ public class GraphManagerIT {
 
         final long maxVersion = System.currentTimeMillis();
 
-        Iterator<Edge> results =
+        Iterator<MarkedEdge> results =
             gm.loadEdgesToTarget( createSearchByEdge( targetId, edge1.getType(), maxVersion, null ) ).toBlocking()
               .getIterator();
 
@@ -1764,7 +1764,7 @@ public class GraphManagerIT {
 
         final long maxVersion = System.currentTimeMillis();
 
-        Iterator<Edge> results =
+        Iterator<MarkedEdge> results =
             gm.loadEdgesToTarget( createSearchByEdge( targetId, edge1.getType(), maxVersion, null ) ).toBlocking()
               .getIterator();
 
@@ -1842,7 +1842,7 @@ public class GraphManagerIT {
 
         final long maxVersion = System.currentTimeMillis();
 
-        Iterator<Edge> results =
+        Iterator<MarkedEdge> results =
             gm.loadEdgesFromSource( createSearchByEdge( sourceId, edge1.getType(), maxVersion, null ) ).toBlocking()
               .getIterator();
 
@@ -1983,7 +1983,7 @@ public class GraphManagerIT {
 
         final long maxVersion = System.currentTimeMillis();
 
-        Iterator<Edge> results =
+        Iterator<MarkedEdge> results =
             gm.loadEdgesToTarget( createSearchByEdge( targetId, edge1.getType(), maxVersion, null ) ).toBlocking()
               .getIterator();
 
@@ -2356,10 +2356,10 @@ public class GraphManagerIT {
             new SimpleSearchByEdge( edge1.getSourceNode(), edge1.getType(), edge1.getTargetNode(), Long.MAX_VALUE,
                 SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent()   );
 
-        final Observable<Edge> edgesDescending = gm.loadEdgeVersions( searchDescending );
+        final Observable<MarkedEdge> edgesDescending = gm.loadEdgeVersions( searchDescending );
 
         //search descending
-        final List<Edge> descending = edgesDescending.toList().toBlocking().single();
+        final List<MarkedEdge> descending = edgesDescending.toList().toBlocking().single();
 
         assertEquals( "Correct size returned", 3, descending.size() );
 
@@ -2376,9 +2376,9 @@ public class GraphManagerIT {
                     new SimpleSearchByEdge( edge1.getSourceNode(), edge1.getType(), edge1.getTargetNode(), 0,
                         SearchByEdgeType.Order.ASCENDING, Optional.<Edge>absent()   );
 
-        Observable<Edge> edgesAscending = gm.loadEdgeVersions( searchAscending );
+        Observable<MarkedEdge> edgesAscending = gm.loadEdgeVersions( searchAscending );
 
-        List<Edge> ascending = edgesAscending.toList().toBlocking().single();
+        List<MarkedEdge> ascending = edgesAscending.toList().toBlocking().single();
 
         assertEquals( "Correct size returned", 3, ascending.size() );
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerLoadTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerLoadTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerLoadTest.java
index b922e7c..22683f6 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerLoadTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerLoadTest.java
@@ -113,7 +113,7 @@ public class GraphManagerLoadTest {
 
 
             @Override
-            public Observable<Edge> doSearch( final GraphManager manager ) {
+            public Observable<MarkedEdge> doSearch( final GraphManager manager ) {
                  return manager.loadEdgesFromSource( new SimpleSearchByEdgeType( sourceId, "test", System.currentTimeMillis(), SearchByEdgeType.Order.DESCENDING,  Optional
                                       .<Edge>absent()) );
             }
@@ -141,7 +141,7 @@ public class GraphManagerLoadTest {
 
 
             @Override
-            public Observable<Edge> doSearch( final GraphManager manager ) {
+            public Observable<MarkedEdge> doSearch( final GraphManager manager ) {
                 return manager.loadEdgesToTarget( new SimpleSearchByEdgeType( targetId, "test", System.currentTimeMillis(), SearchByEdgeType.Order.DESCENDING,  Optional.<Edge>absent() ) );
             }
         };
@@ -220,7 +220,7 @@ public class GraphManagerLoadTest {
             final CountDownLatch latch = new CountDownLatch( 1 );
 
 
-            generator.doSearch( manager ).take( readCount ).buffer( 1000 ).subscribe( new Subscriber<List<Edge>>() {
+            generator.doSearch( manager ).take( readCount ).buffer( 1000 ).subscribe( new Subscriber<List<MarkedEdge>>() {
                 @Override
                 public void onCompleted() {
                     timer.stop();
@@ -235,7 +235,7 @@ public class GraphManagerLoadTest {
 
 
                 @Override
-                public void onNext( final List<Edge> edges ) {
+                public void onNext( final List<MarkedEdge> edges ) {
                     log.info("Read {} edges", edges.size());
                 }
             } );
@@ -263,6 +263,6 @@ public class GraphManagerLoadTest {
          * @param manager
          * @return
          */
-        public Observable<Edge> doSearch( final GraphManager manager );
+        public Observable<MarkedEdge> doSearch( final GraphManager manager );
     }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
index c1917bb..6aad289 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
@@ -176,7 +176,7 @@ public class GraphManagerShardConsistencyIT {
 
 
             @Override
-            public Observable<Edge> doSearch( final GraphManager manager ) {
+            public Observable<MarkedEdge> doSearch( final GraphManager manager ) {
                 return manager.loadEdgesFromSource(
                     new SimpleSearchByEdgeType( sourceId, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
                         Optional.<Edge>absent() ) );
@@ -409,7 +409,7 @@ public class GraphManagerShardConsistencyIT {
 
 
             @Override
-            public Observable<Edge> doSearch( final GraphManager manager ) {
+            public Observable<MarkedEdge> doSearch( final GraphManager manager ) {
                 return manager.loadEdgesFromSource(
                     new SimpleSearchByEdgeType( sourceId, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
                         Optional.<Edge>absent(), false ) );
@@ -729,7 +729,7 @@ public class GraphManagerShardConsistencyIT {
         /**
          * Perform the search returning an observable edge
          */
-        public Observable<Edge> doSearch( final GraphManager manager );
+        public Observable<MarkedEdge> doSearch( final GraphManager manager );
     }
 }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerStressTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerStressTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerStressTest.java
index 2889684..6a2efc9 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerStressTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerStressTest.java
@@ -107,25 +107,25 @@ public class GraphManagerStressTest {
 
 
             @Override
-            public Observable<Edge> doSearch( final GraphManager manager ) {
+            public Observable<MarkedEdge> doSearch( final GraphManager manager ) {
 
 
                 final long timestamp = System.currentTimeMillis();
 
 
-                return Observable.create( new Observable.OnSubscribe<Edge>() {
+                return Observable.create( new Observable.OnSubscribe<MarkedEdge>() {
 
                     @Override
-                    public void call( final Subscriber<? super Edge> subscriber ) {
+                    public void call( final Subscriber<? super MarkedEdge> subscriber ) {
                         try {
                             for ( Id sourceId : sourceIds ) {
 
-                                final Iterable<Edge> edges = manager.loadEdgesFromSource(
+                                final Iterable<MarkedEdge> edges = manager.loadEdgesFromSource(
                                         new SimpleSearchByEdgeType( sourceId, "test", timestamp, SearchByEdgeType.Order.DESCENDING,  Optional
                                                                                     .<Edge>absent() ) )
                                                                     .toBlocking().toIterable();
 
-                                for ( Edge edge : edges ) {
+                                for ( MarkedEdge edge : edges ) {
                                     log.debug( "Firing on next for edge {}", edge );
 
                                     subscriber.onNext( edge );
@@ -195,7 +195,7 @@ public class GraphManagerStressTest {
 
 
             @Override
-            public Observable<Edge> doSearch( final GraphManager manager ) {
+            public Observable<MarkedEdge> doSearch( final GraphManager manager ) {
                 return manager.loadEdgesFromSource( new SimpleSearchByEdgeType( sourceId, "test", System.currentTimeMillis(), SearchByEdgeType.Order.DESCENDING,  Optional.<Edge>absent() ) );
             }
         };
@@ -222,7 +222,7 @@ public class GraphManagerStressTest {
 
 
             @Override
-            public Observable<Edge> doSearch( final GraphManager manager ) {
+            public Observable<MarkedEdge> doSearch( final GraphManager manager ) {
 
                 return manager.loadEdgesToTarget( new SimpleSearchByEdgeType( targetId, "test", System.currentTimeMillis(), SearchByEdgeType.Order.DESCENDING,  Optional.<Edge>absent() ) );
             }
@@ -307,6 +307,6 @@ public class GraphManagerStressTest {
          */
         public Edge newEdge();
 
-        public Observable<Edge> doSearch( final GraphManager manager );
+        public Observable<MarkedEdge> doSearch( final GraphManager manager );
     }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/services/src/main/java/org/apache/usergrid/management/AppInfoMigrationPlugin.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/AppInfoMigrationPlugin.java b/stack/services/src/main/java/org/apache/usergrid/management/AppInfoMigrationPlugin.java
index d9d3d0d..6e84601 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/AppInfoMigrationPlugin.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/AppInfoMigrationPlugin.java
@@ -47,6 +47,7 @@ import org.apache.usergrid.persistence.exceptions.ApplicationAlreadyExistsExcept
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
 import org.apache.usergrid.persistence.graph.SearchByEdgeType;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
@@ -239,7 +240,7 @@ public class AppInfoMigrationPlugin implements MigrationPlugin {
         final EntityCollectionManager managementCollectionManager =
             entityCollectionManagerFactory.createCollectionManager( managementAppScope );
 
-        Observable<Edge> edgesObservable = getApplicationInfoEdges( appId );
+        Observable<MarkedEdge> edgesObservable = getApplicationInfoEdges( appId );
         //get the graph for all app infos
         Observable<org.apache.usergrid.persistence.model.entity.Entity> entityObs = edgesObservable.flatMap( edge -> {
             final Id appInfoId = edge.getTargetNode();
@@ -299,7 +300,7 @@ public class AppInfoMigrationPlugin implements MigrationPlugin {
     }
 
 
-    public Observable<Edge> getApplicationInfoEdges( final UUID applicationId ) {
+    public Observable<MarkedEdge> getApplicationInfoEdges( final UUID applicationId ) {
         final ApplicationScope managementAppScope = getApplicationScope( CpNamingUtils.MANAGEMENT_APPLICATION_ID );
         final GraphManager gm = graphManagerFactory.createEdgeManager( managementAppScope );