You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2015/10/26 17:58:14 UTC

[4/7] usergrid git commit: Changed the interface to return MarkedEdge interface. This allows the caller to determine if they want to filter or not, as well as to perform read-repair actions on read.

Changed the interface to return MarkedEdge interface.  This allows the caller to determine if they want to filter or not, as well as to perform read-repair actions on read.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/1d4f22c5
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/1d4f22c5
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/1d4f22c5

Branch: refs/heads/2.1-release
Commit: 1d4f22c59c057a6f9d52844e03897786b0773a5b
Parents: 4e51d38
Author: Todd Nine <tn...@apigee.com>
Authored: Fri Oct 23 15:25:39 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri Oct 23 15:25:39 2015 -0600

----------------------------------------------------------------------
 .../corepersistence/CpEntityManagerFactory.java |  7 +-
 .../corepersistence/CpRelationManager.java      | 16 ++---
 .../read/traverse/AbstractReadGraphFilter.java  | 74 +++++++++++++++----
 .../read/traverse/EdgeCursorSerializer.java     |  8 ++-
 .../traverse/ReadGraphCollectionFilter.java     |  5 +-
 .../ReadGraphConnectionByTypeFilter.java        | 11 +--
 .../traverse/ReadGraphConnectionFilter.java     |  5 +-
 .../index/AsyncIndexServiceTest.java            |  3 +-
 .../corepersistence/index/IndexServiceTest.java | 12 ++--
 .../pipeline/cursor/CursorTest.java             | 24 +++----
 .../service/ConnectionServiceImplTest.java      |  5 +-
 .../persistence/ApplicationServiceIT.java       |  4 +-
 .../persistence/graph/GraphManager.java         | 14 ++--
 .../usergrid/persistence/graph/MarkedEdge.java  |  1 +
 .../graph/impl/GraphManagerImpl.java            | 30 ++++----
 .../graph/impl/SimpleMarkedEdge.java            | 39 ++++++----
 .../persistence/graph/GraphManagerIT.java       | 76 ++++++++++----------
 .../persistence/graph/GraphManagerLoadTest.java | 10 +--
 .../graph/GraphManagerShardConsistencyIT.java   |  6 +-
 .../graph/GraphManagerStressTest.java           | 16 ++---
 .../management/AppInfoMigrationPlugin.java      |  5 +-
 21 files changed, 220 insertions(+), 151 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index 4bdade5..c75a025 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -63,6 +63,7 @@ import org.apache.usergrid.persistence.exceptions.EntityNotFoundException;
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
 import org.apache.usergrid.persistence.graph.SearchByEdgeType;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
 import org.apache.usergrid.persistence.index.EntityIndex;
@@ -477,9 +478,9 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
         logger.debug("getApplications(): Loading edges of edgeType {} from {}:{}",
             new Object[]{edgeType, managementId.getType(), managementId.getUuid()});
 
-        Observable<Edge> edges = gm.loadEdgesFromSource( new SimpleSearchByEdgeType(
-                managementId, edgeType, Long.MAX_VALUE,
-                SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent() ));
+        Observable<MarkedEdge> edges = gm.loadEdgesFromSource(
+            new SimpleSearchByEdgeType( managementId, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
+                Optional.<Edge>absent() ) );
 
         final EntityCollectionManager ecm = managerCache.getEntityCollectionManager( appScope );
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index aad7610..b4cabc4 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -59,6 +59,7 @@ import org.apache.usergrid.persistence.entities.Group;
 import org.apache.usergrid.persistence.entities.User;
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
 import org.apache.usergrid.persistence.graph.SearchByEdge;
 import org.apache.usergrid.persistence.graph.SearchByEdgeType;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
@@ -227,14 +228,9 @@ public class CpRelationManager implements RelationManager {
 
         Observable<Edge> edges =
             gm.getEdgeTypesToTarget( new SimpleSearchEdgeType( cpHeadEntity.getId(), edgeType, null ) )
-              .flatMap( new Func1<String, Observable<Edge>>() {
-                  @Override
-                  public Observable<Edge> call( final String edgeType ) {
-                      return gm.loadEdgesToTarget(
-                          new SimpleSearchByEdgeType( cpHeadEntity.getId(), edgeType, Long.MAX_VALUE,
-                              SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent() ) );
-                  }
-              } );
+              .flatMap( edgeType1 -> gm.loadEdgesToTarget(
+                  new SimpleSearchByEdgeType( cpHeadEntity.getId(), edgeType1, Long.MAX_VALUE,
+                      SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent() ) ) );
 
         //if our limit is set, take them.  Note this logic is still borked, we can't possibly fit everything in memmory
         if ( limit > -1 ) {
@@ -268,7 +264,7 @@ public class CpRelationManager implements RelationManager {
         } );
 
         GraphManager gm = managerCache.getGraphManager( applicationScope );
-        Observable<Edge> edges = gm.loadEdgeVersions( CpNamingUtils
+        Observable<MarkedEdge> edges = gm.loadEdgeVersions( CpNamingUtils
             .createEdgeFromConnectionType( new SimpleId( headEntity.getUuid(), headEntity.getType() ), connectionType,
                 entityId ) );
 
@@ -288,7 +284,7 @@ public class CpRelationManager implements RelationManager {
         } );
 
         GraphManager gm = managerCache.getGraphManager( applicationScope );
-        Observable<Edge> edges = gm.loadEdgeVersions( CpNamingUtils
+        Observable<MarkedEdge> edges = gm.loadEdgeVersions( CpNamingUtils
             .createEdgeFromCollectionName( new SimpleId( headEntity.getUuid(), headEntity.getType() ), collectionName,
                 entityId ) );
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
index d3e0345..621edd2 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
@@ -23,13 +23,16 @@ package org.apache.usergrid.corepersistence.pipeline.read.traverse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
 import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
 import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter;
 import org.apache.usergrid.corepersistence.pipeline.read.EdgePath;
 import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
 import org.apache.usergrid.persistence.graph.SearchByEdgeType;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
 import org.apache.usergrid.persistence.model.entity.Id;
@@ -42,18 +45,21 @@ import rx.Observable;
 /**
  * Command for reading graph edges
  */
-public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id, Edge> {
+public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id, MarkedEdge> {
 
     private static final Logger logger = LoggerFactory.getLogger( AbstractReadGraphFilter.class );
 
     private final GraphManagerFactory graphManagerFactory;
+    private final AsyncEventService asyncEventService;
 
 
     /**
      * Create a new instance of our command
      */
-    public AbstractReadGraphFilter( final GraphManagerFactory graphManagerFactory ) {
+    public AbstractReadGraphFilter( final GraphManagerFactory graphManagerFactory,
+                                    final AsyncEventService asyncEventService ) {
         this.graphManagerFactory = graphManagerFactory;
+        this.asyncEventService = asyncEventService;
     }
 
 
@@ -61,9 +67,11 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
     public Observable<FilterResult<Id>> call( final Observable<FilterResult<Id>> previousIds ) {
 
 
+        final ApplicationScope applicationScope = pipelineContext.getApplicationScope();
+
         //get the graph manager
         final GraphManager graphManager =
-            graphManagerFactory.createEdgeManager( pipelineContext.getApplicationScope() );
+            graphManagerFactory.createEdgeManager( applicationScope );
 
 
         final String edgeName = getEdgeTypeName();
@@ -74,18 +82,60 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
         return previousIds.flatMap( previousFilterValue -> {
 
             //set our our constant state
-            final Optional<Edge> startFromCursor = getSeekValue();
+            final Optional<MarkedEdge> startFromCursor = getSeekValue();
             final Id id = previousFilterValue.getValue();
 
 
+            final Optional<Edge> typeWrapper = Optional.fromNullable(startFromCursor.orNull());
+
+            /**
+             * We do not want to filter.  This is intentional DO NOT REMOVE!!!
+             *
+             * We want to fire events on these edges if they exist, the delete was missed.
+             */
             final SimpleSearchByEdgeType search =
                 new SimpleSearchByEdgeType( id, edgeName, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
-                    startFromCursor );
+                    typeWrapper, false );
 
             /**
              * TODO, pass a message with pointers to our cursor values to be generated later
              */
-            return graphManager.loadEdgesFromSource( search )
+            return graphManager.loadEdgesFromSource( search ).filter(markedEdge -> {
+
+                final boolean isDeleted = markedEdge.isDeleted();
+                final boolean isSourceNodeDeleted = markedEdge.isSourceNodeDelete();
+                final boolean isTargetNodeDelete = markedEdge.isTargetNodeDeleted();
+
+
+
+                if(isDeleted){
+                    logger.trace( "Edge {} is deleted, queueing the delete event", markedEdge );
+                    asyncEventService.queueDeleteEdge( applicationScope, markedEdge  );
+                }
+
+                if(isSourceNodeDeleted){
+                    final Id sourceNodeId = markedEdge.getSourceNode();
+
+                    logger.trace( "Edge {} has a deleted source node, queueing the delete entity event for id {}", markedEdge, sourceNodeId );
+
+                    asyncEventService.queueEntityDelete( applicationScope, sourceNodeId );
+                }
+
+                if(isTargetNodeDelete){
+
+                    final Id targetNodeId = markedEdge.getTargetNode();
+
+                    logger.trace( "Edge {} has a deleted target node, queueing the delete entity event for id {}", markedEdge, targetNodeId );
+
+                    asyncEventService.queueEntityDelete( applicationScope, targetNodeId );
+                }
+
+
+                //filter if any of them are marked
+                return !isDeleted && !isSourceNodeDeleted && !isTargetNodeDelete;
+
+
+            })
                 //set the edge state for cursors
                 .doOnNext( edge -> {
                     logger.trace( "Seeking over edge {}", edge );
@@ -100,7 +150,7 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
 
 
     @Override
-    protected FilterResult<Id> createFilterResult( final Id emit, final Edge cursorValue,
+    protected FilterResult<Id> createFilterResult( final Id emit, final MarkedEdge cursorValue,
                                                    final Optional<EdgePath> parent ) {
 
         //if it's our first pass, there's no cursor to generate
@@ -113,7 +163,7 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
 
 
     @Override
-    protected CursorSerializer<Edge> getCursorSerializer() {
+    protected CursorSerializer<MarkedEdge> getCursorSerializer() {
         return EdgeCursorSerializer.INSTANCE;
     }
 
@@ -131,14 +181,14 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
      */
     private final class EdgeState {
 
-        private Edge cursorEdge = null;
-        private Edge currentEdge = null;
+        private MarkedEdge cursorEdge = null;
+        private MarkedEdge currentEdge = null;
 
 
         /**
          * Update the pointers
          */
-        private void update( final Edge newEdge ) {
+        private void update( final MarkedEdge newEdge ) {
             cursorEdge = currentEdge;
             currentEdge = newEdge;
         }
@@ -147,7 +197,7 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
         /**
          * Get the edge to use in cursors for resume
          */
-        private Edge getCursorEdge() {
+        private MarkedEdge getCursorEdge() {
             return cursorEdge;
         }
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EdgeCursorSerializer.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EdgeCursorSerializer.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EdgeCursorSerializer.java
index 8d9bf6f..d54e547 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EdgeCursorSerializer.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EdgeCursorSerializer.java
@@ -22,20 +22,22 @@ package org.apache.usergrid.corepersistence.pipeline.read.traverse;
 
 import org.apache.usergrid.corepersistence.pipeline.cursor.AbstractCursorSerializer;
 import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
 import org.apache.usergrid.persistence.graph.impl.SimpleEdge;
+import org.apache.usergrid.persistence.graph.impl.SimpleMarkedEdge;
 
 
 /**
  * Edge cursor serializer
  */
-public class EdgeCursorSerializer extends AbstractCursorSerializer<Edge> {
+public class EdgeCursorSerializer extends AbstractCursorSerializer<MarkedEdge> {
 
 
     public static final EdgeCursorSerializer INSTANCE = new EdgeCursorSerializer();
 
     @Override
-    protected Class<SimpleEdge> getType() {
-        return SimpleEdge.class;
+    protected Class<SimpleMarkedEdge> getType() {
+        return SimpleMarkedEdge.class;
     }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java
index dc39f5c..db5a0a8 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java
@@ -20,6 +20,7 @@
 package org.apache.usergrid.corepersistence.pipeline.read.traverse;
 
 
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
 
 import com.google.inject.Inject;
@@ -40,8 +41,8 @@ public class ReadGraphCollectionFilter extends AbstractReadGraphFilter {
      * Create a new instance of our command
      */
     @Inject
-    public ReadGraphCollectionFilter( final GraphManagerFactory graphManagerFactory, @Assisted final String collectionName ) {
-        super( graphManagerFactory );
+    public ReadGraphCollectionFilter( final GraphManagerFactory graphManagerFactory, final AsyncEventService asyncEventService, @Assisted final String collectionName ) {
+        super( graphManagerFactory, asyncEventService );
         this.collectionName = collectionName;
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionByTypeFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionByTypeFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionByTypeFilter.java
index 61ba4ad..054a52b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionByTypeFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionByTypeFilter.java
@@ -26,6 +26,7 @@ import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
 import org.apache.usergrid.persistence.graph.SearchByEdgeType;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchByIdType;
 import org.apache.usergrid.persistence.model.entity.Id;
@@ -42,7 +43,7 @@ import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getEdgeType
 /**
  * Command for reading graph edges on a connection
  */
-public class ReadGraphConnectionByTypeFilter extends AbstractPathFilter<Id, Id, Edge>{
+public class ReadGraphConnectionByTypeFilter extends AbstractPathFilter<Id, Id, MarkedEdge>{
 
     private final GraphManagerFactory graphManagerFactory;
     private final String connectionName;
@@ -77,12 +78,14 @@ public class ReadGraphConnectionByTypeFilter extends AbstractPathFilter<Id, Id,
         return filterResultObservable.flatMap( idFilterResult -> {
 
               //set our our constant state
-            final Optional<Edge> startFromCursor = getSeekValue();
+            final Optional<MarkedEdge> startFromCursor = getSeekValue();
             final Id id = idFilterResult.getValue();
 
+            final Optional<Edge> typeWrapper = Optional.fromNullable(startFromCursor.orNull());
+
             final SimpleSearchByIdType search =
                 new SimpleSearchByIdType( id, edgeName, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
-                    entityType, startFromCursor );
+                    entityType, typeWrapper );
 
             return graphManager.loadEdgesFromSourceByType( search ).map(
                 edge -> createFilterResult( edge.getTargetNode(), edge, idFilterResult.getPath() ));
@@ -91,7 +94,7 @@ public class ReadGraphConnectionByTypeFilter extends AbstractPathFilter<Id, Id,
 
 
     @Override
-    protected CursorSerializer<Edge> getCursorSerializer() {
+    protected CursorSerializer<MarkedEdge> getCursorSerializer() {
         return EdgeCursorSerializer.INSTANCE;
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java
index 11ec5f8..93e8fd4 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java
@@ -20,6 +20,7 @@
 package org.apache.usergrid.corepersistence.pipeline.read.traverse;
 
 
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
 
 import com.google.inject.Inject;
@@ -40,8 +41,8 @@ public class ReadGraphConnectionFilter extends AbstractReadGraphFilter {
      * Create a new instance of our command
      */
     @Inject
-    public ReadGraphConnectionFilter( final GraphManagerFactory graphManagerFactory, @Assisted final String connectionName ) {
-        super( graphManagerFactory );
+    public ReadGraphConnectionFilter( final GraphManagerFactory graphManagerFactory,  final AsyncEventService asyncEventService,  @Assisted final String connectionName ) {
+        super( graphManagerFactory, asyncEventService );
         this.connectionName = connectionName;
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java
index 2863cbf..74f9ce0 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.UUID;
 
 import org.apache.usergrid.persistence.EntityManagerFactory;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
 import org.apache.usergrid.persistence.index.*;
 import org.junit.Before;
 import org.junit.Rule;
@@ -135,7 +136,7 @@ public abstract class AsyncIndexServiceTest {
          */
 
 
-        final List<Edge> connectionSearchEdges = Observable.range( 0, 500 ).flatMap(integer -> {
+        final List<MarkedEdge> connectionSearchEdges = Observable.range( 0, 500 ).flatMap(integer -> {
             final Id connectingId = createId("connecting");
             final Edge edge = CpNamingUtils.createConnectionEdge(connectingId, "likes", testEntity.getId());
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java
index 6001dd4..90d6c5a 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.UUID;
 
+import org.apache.usergrid.persistence.graph.MarkedEdge;
 import org.apache.usergrid.persistence.index.impl.IndexProducer;
 import org.junit.Before;
 import org.junit.Test;
@@ -244,7 +245,7 @@ public class IndexServiceTest {
 //        final int edgeCount = indexFig.getIndexBatchSize()*2;
         final int edgeCount = 100;
 
-        final List<Edge> connectionSearchEdges = Observable.range( 0, edgeCount ).flatMap( integer -> {
+        final List<MarkedEdge> connectionSearchEdges = Observable.range( 0, edgeCount ).flatMap( integer -> {
             final Id connectingId = createId( "connecting" );
             final Edge edge = CpNamingUtils.createConnectionEdge( connectingId, "likes", testEntity.getId() );
 
@@ -377,7 +378,8 @@ public class IndexServiceTest {
         //Write multiple connection edges
         final int edgeCount = 5;
 
-        final List<Edge> connectionSearchEdges = createConnectionSearchEdges( testEntity, graphManager, edgeCount );
+        final List<MarkedEdge>
+            connectionSearchEdges = createConnectionSearchEdges( testEntity, graphManager, edgeCount );
 
         indexService.indexEntity( applicationScope, testEntity ).flatMap(mesage -> indexProducer.put(mesage)).toBlocking().getIterator();
 
@@ -485,10 +487,10 @@ public class IndexServiceTest {
     }
 
 
-    private List<Edge> createConnectionSearchEdges(
-        final Entity testEntity, final GraphManager graphManager, final int edgeCount ) {
+    private List<MarkedEdge> createConnectionSearchEdges( final Entity testEntity, final GraphManager graphManager,
+                                                          final int edgeCount ) {
 
-        final List<Edge> connectionSearchEdges = Observable.range( 0, edgeCount ).flatMap( integer -> {
+        final List<MarkedEdge> connectionSearchEdges = Observable.range( 0, edgeCount ).flatMap( integer -> {
 
             //create our connection edge.
             final Id connectingId = createId( "connecting" );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java
index 7128dcf..c9dcbf1 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java
@@ -28,7 +28,9 @@ import org.apache.usergrid.corepersistence.pipeline.read.EdgePath;
 import org.apache.usergrid.corepersistence.pipeline.read.search.ElasticsearchCursorSerializer;
 import org.apache.usergrid.corepersistence.pipeline.read.traverse.EdgeCursorSerializer;
 import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
 import org.apache.usergrid.persistence.graph.impl.SimpleEdge;
+import org.apache.usergrid.persistence.graph.impl.SimpleMarkedEdge;
 
 import com.google.common.base.Optional;
 
@@ -40,19 +42,12 @@ public class CursorTest {
 
     @Test
     public void testCursors(){
+         //test encoding edge
 
+        final MarkedEdge edge1 = new SimpleMarkedEdge( createId("source1"), "edgeType1",  createId("target1"), 100, false, false, false  );
 
 
-
-
-
-
-        //test encoding edge
-
-        final Edge edge1 = new SimpleEdge( createId("source1"), "edgeType1",  createId("target1"), 100  );
-
-
-        final Edge edge2 = new SimpleEdge( createId("source2"), "edgeType2",  createId("target2"), 110  );
+        final MarkedEdge edge2 = new SimpleMarkedEdge( createId("source2"), "edgeType2",  createId("target2"), 110, false, false, false  );
 
 
 
@@ -64,11 +59,12 @@ public class CursorTest {
 
         final EdgePath<Integer> filter3Path = new EdgePath<>( 3, query2, ElasticsearchCursorSerializer.INSTANCE, Optional.absent() );
 
-        final EdgePath<Edge> filter2Path = new EdgePath<Edge>(2, edge2, EdgeCursorSerializer.INSTANCE, Optional.of( filter3Path ));
+        final EdgePath<MarkedEdge> filter2Path =
+            new EdgePath<>( 2, edge2, EdgeCursorSerializer.INSTANCE, Optional.of( filter3Path ) );
 
         final EdgePath<Integer> filter1Path = new EdgePath<>( 1, query1, ElasticsearchCursorSerializer.INSTANCE, Optional.of(filter2Path) );
 
-        final EdgePath<Edge> filter0Path = new EdgePath<>( 0, edge1, EdgeCursorSerializer.INSTANCE, Optional.of( filter1Path ) );
+        final EdgePath<MarkedEdge> filter0Path = new EdgePath<>( 0, edge1, EdgeCursorSerializer.INSTANCE, Optional.of( filter1Path ) );
 
 
 
@@ -91,7 +87,7 @@ public class CursorTest {
 
         assertEquals(query2, parsedQuery2);
 
-        final Edge parsedEdge2 = requestCursor.getCursor( 2, EdgeCursorSerializer.INSTANCE );
+        final MarkedEdge parsedEdge2 = requestCursor.getCursor( 2, EdgeCursorSerializer.INSTANCE );
 
         assertEquals( edge2, parsedEdge2 );
 
@@ -100,7 +96,7 @@ public class CursorTest {
         assertEquals( query1, parsedQuery1 );
 
 
-        final Edge parsedEdge1 = requestCursor.getCursor( 0, EdgeCursorSerializer.INSTANCE );
+        final MarkedEdge parsedEdge1 = requestCursor.getCursor( 0, EdgeCursorSerializer.INSTANCE );
 
         assertEquals(edge1, parsedEdge1);
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/core/src/test/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImplTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImplTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImplTest.java
index 326e128..6929d87 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImplTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImplTest.java
@@ -36,6 +36,7 @@ import org.apache.usergrid.persistence.core.test.UseModules;
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
 import org.apache.usergrid.persistence.graph.SearchByEdge;
 import org.apache.usergrid.persistence.graph.SearchByEdgeType;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
@@ -118,7 +119,7 @@ public class ConnectionServiceImplTest {
             new SimpleSearchByEdge( source, connectionEdge.getType(), target, Long.MAX_VALUE,
                 SearchByEdgeType.Order.DESCENDING, Optional.absent() );
 
-        final List<Edge> edges = gm.loadEdgeVersions( simpleSearchByEdge ).toList().toBlocking().last();
+        final List<MarkedEdge> edges = gm.loadEdgeVersions( simpleSearchByEdge ).toList().toBlocking().last();
 
         assertEquals( 1, edges.size() );
 
@@ -209,7 +210,7 @@ public class ConnectionServiceImplTest {
                 SearchByEdgeType.Order.DESCENDING, Optional.absent() );
 
         //check only 1 exists
-        final List<Edge> edges = gm.loadEdgeVersions( simpleSearchByEdge ).toList().toBlocking().last();
+        final List<MarkedEdge> edges = gm.loadEdgeVersions( simpleSearchByEdge ).toList().toBlocking().last();
 
         assertEquals( 1, edges.size() );
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/core/src/test/java/org/apache/usergrid/persistence/ApplicationServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/ApplicationServiceIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/ApplicationServiceIT.java
index d870114..658d3eb 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/ApplicationServiceIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/ApplicationServiceIT.java
@@ -30,6 +30,7 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
 import org.apache.usergrid.persistence.graph.SearchByEdgeType;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
@@ -86,7 +87,8 @@ public class ApplicationServiceIT extends AbstractCoreIT {
             , Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
             Optional.<Edge>absent() );
 
-        Iterator<Edge> results = graphManager.loadEdgesFromSource(simpleSearchByEdgeType).toBlocking().getIterator();
+        Iterator<MarkedEdge>
+            results = graphManager.loadEdgesFromSource(simpleSearchByEdgeType).toBlocking().getIterator();
         if(results.hasNext()){
             Assert.fail("should be empty");
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java
index 6100725..000c633 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java
@@ -59,7 +59,7 @@ public interface GraphManager extends CPManager {
      * Create or update an edge.  Note that the implementation should also create incoming (reversed) edges for this
      * edge.
      */
-    Observable<Edge> writeEdge( Edge edge );
+    Observable<MarkedEdge> writeEdge( Edge edge );
 
 
     /**
@@ -68,7 +68,7 @@ public interface GraphManager extends CPManager {
      *
      * Implementation should also mark the incoming (reversed) edge. Only marks the specific version
      */
-    Observable<Edge> markEdge( Edge edge );
+    Observable<MarkedEdge> markEdge( Edge edge );
 
     /**
      * @param edge Remove the edge in the graph
@@ -98,7 +98,7 @@ public interface GraphManager extends CPManager {
     /**
      * Get all versions of this edge where versions <= max version
      */
-    Observable<Edge> loadEdgeVersions( SearchByEdge edge );
+    Observable<MarkedEdge> loadEdgeVersions( SearchByEdge edge );
 
     /**
      * Returns an observable that emits all edges where the specified node is the source node. The edges will match the
@@ -108,7 +108,7 @@ public interface GraphManager extends CPManager {
      *
      * @return An observable that emits Edges. The observer will need to unsubscribe when it has completed consumption.
      */
-    Observable<Edge> loadEdgesFromSource( SearchByEdgeType search );
+    Observable<MarkedEdge> loadEdgesFromSource( SearchByEdgeType search );
 
     /**
      * Returns an observable that emits all edges where the specified node is the target node. The edges will match the
@@ -118,7 +118,7 @@ public interface GraphManager extends CPManager {
      *
      * @return An observable that emits Edges. The observer will need to unsubscribe when it has completed consumption.
      */
-    Observable<Edge> loadEdgesToTarget( SearchByEdgeType search );
+    Observable<MarkedEdge> loadEdgesToTarget( SearchByEdgeType search );
 
 
     /**
@@ -129,7 +129,7 @@ public interface GraphManager extends CPManager {
      *
      * @return An observable that emits Edges. The observer will need to unsubscribe when it has completed consumption.
      */
-    Observable<Edge> loadEdgesFromSourceByType( SearchByIdType search );
+    Observable<MarkedEdge> loadEdgesFromSourceByType( SearchByIdType search );
 
 
     /**
@@ -140,7 +140,7 @@ public interface GraphManager extends CPManager {
      *
      * @return An observable that emits Edges. The observer will need to unsubscribe when it has completed consumption.
      */
-    Observable<Edge> loadEdgesToTargetByType( SearchByIdType search );
+    Observable<MarkedEdge> loadEdgesToTargetByType( SearchByIdType search );
 
     /**
      * Get all edge types to this node.  The node provided by search is the target node.

http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/MarkedEdge.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/MarkedEdge.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/MarkedEdge.java
index da6fedb..88809e2 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/MarkedEdge.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/MarkedEdge.java
@@ -30,6 +30,7 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
  * An edge.  With the additional info of if it is marked for deletion
  *
  */
+@JsonDeserialize(as = SimpleMarkedEdge.class)
 public interface MarkedEdge extends Edge{
 
     /**

http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
index 1bcb398..93ae753 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
@@ -144,12 +144,12 @@ public class GraphManagerImpl implements GraphManager {
 
 
     @Override
-    public Observable<Edge> writeEdge( final Edge edge ) {
+    public Observable<MarkedEdge> writeEdge( final Edge edge ) {
         GraphValidation.validateEdge( edge );
 
         final MarkedEdge markedEdge = new SimpleMarkedEdge( edge, false );
 
-        final Observable<Edge> observable = Observable.just( markedEdge ).map( edge1 -> {
+        final Observable<MarkedEdge> observable = Observable.just( markedEdge ).map( edge1 -> {
 
             final UUID timestamp = UUIDGenerator.newTimeUUID();
 
@@ -175,12 +175,12 @@ public class GraphManagerImpl implements GraphManager {
 
 
     @Override
-    public Observable<Edge> markEdge( final Edge edge ) {
+    public Observable<MarkedEdge> markEdge( final Edge edge ) {
         GraphValidation.validateEdge( edge );
 
         final MarkedEdge markedEdge = new SimpleMarkedEdge( edge, true );
 
-        final Observable<Edge> observable = Observable.just( markedEdge ).map( edge1 -> {
+        final Observable<MarkedEdge> observable = Observable.just( markedEdge ).map( edge1 -> {
 
             final UUID timestamp = UUIDGenerator.newTimeUUID();
 
@@ -282,9 +282,9 @@ public class GraphManagerImpl implements GraphManager {
 
 
     @Override
-    public Observable<Edge> loadEdgeVersions( final SearchByEdge searchByEdge ) {
+    public Observable<MarkedEdge> loadEdgeVersions( final SearchByEdge searchByEdge ) {
 
-        final Observable<Edge> edges =
+        final Observable<MarkedEdge> edges =
             Observable.create( new ObservableIterator<MarkedEdge>( "getEdgeTypesFromSource" ) {
                 @Override
                 protected Iterator<MarkedEdge> getIterator() {
@@ -298,8 +298,8 @@ public class GraphManagerImpl implements GraphManager {
 
 
     @Override
-    public Observable<Edge> loadEdgesFromSource( final SearchByEdgeType search ) {
-        final Observable<Edge> edges =
+    public Observable<MarkedEdge> loadEdgesFromSource( final SearchByEdgeType search ) {
+        final Observable<MarkedEdge> edges =
             Observable.create( new ObservableIterator<MarkedEdge>( "loadEdgesFromSource" ) {
                 @Override
                 protected Iterator<MarkedEdge> getIterator() {
@@ -313,8 +313,8 @@ public class GraphManagerImpl implements GraphManager {
 
 
     @Override
-    public Observable<Edge> loadEdgesToTarget( final SearchByEdgeType search ) {
-        final Observable<Edge> edges =
+    public Observable<MarkedEdge> loadEdgesToTarget( final SearchByEdgeType search ) {
+        final Observable<MarkedEdge> edges =
             Observable.create( new ObservableIterator<MarkedEdge>( "loadEdgesToTarget" ) {
                 @Override
                 protected Iterator<MarkedEdge> getIterator() {
@@ -329,8 +329,8 @@ public class GraphManagerImpl implements GraphManager {
 
 
     @Override
-    public Observable<Edge> loadEdgesFromSourceByType( final SearchByIdType search ) {
-        final Observable<Edge> edges =
+    public Observable<MarkedEdge> loadEdgesFromSourceByType( final SearchByIdType search ) {
+        final Observable<MarkedEdge> edges =
             Observable.create( new ObservableIterator<MarkedEdge>( "loadEdgesFromSourceByType" ) {
                 @Override
                 protected Iterator<MarkedEdge> getIterator() {
@@ -344,8 +344,8 @@ public class GraphManagerImpl implements GraphManager {
 
 
     @Override
-    public Observable<Edge> loadEdgesToTargetByType( final SearchByIdType search ) {
-        final Observable<Edge> edges =
+    public Observable<MarkedEdge> loadEdgesToTargetByType( final SearchByIdType search ) {
+        final Observable<MarkedEdge> edges =
             Observable.create( new ObservableIterator<MarkedEdge>( "loadEdgesToTargetByType" ) {
                 @Override
                 protected Iterator<MarkedEdge> getIterator() {
@@ -480,7 +480,7 @@ public class GraphManagerImpl implements GraphManager {
                     }
 
                     //if any one of these is true, we filter it
-                    return !(simpleMarkedEdge.isDeleted() || simpleMarkedEdge.isSourceNodeDelete() || simpleMarkedEdge.isTargetNodeDeleted());
+                    return !simpleMarkedEdge.isDeleted() &&  !simpleMarkedEdge.isSourceNodeDelete() && !simpleMarkedEdge.isTargetNodeDeleted();
                 });
             } );
         }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleMarkedEdge.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleMarkedEdge.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleMarkedEdge.java
index c6dc2e4..9c35e2e 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleMarkedEdge.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleMarkedEdge.java
@@ -29,50 +29,61 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
 
 /**
  * Simple bean to represent our edge
+ *
  * @author tnine
  */
-public class SimpleMarkedEdge extends  SimpleEdge implements MarkedEdge {
+public class SimpleMarkedEdge extends SimpleEdge implements MarkedEdge {
+
+    private boolean isDeleted;
+    private boolean isSourceNodeDeleted;
+    private boolean isTargetNodeDeleted;
 
-    private final boolean deleted;
-    private final boolean isSourceNodeDeleted;
-    private final boolean isTargetNodeDeleted;
 
+    /**
+     * Unused but required for Jackson
+     */
+    public SimpleMarkedEdge() {
+    }
 
-    public SimpleMarkedEdge( final Id sourceNode, final String type, final Id targetNode, final long timestamp, final boolean deleted) {
 
-        this( sourceNode, type, targetNode, timestamp, deleted, false, false );
+    public SimpleMarkedEdge( final Id sourceNode, final String type, final Id targetNode, final long timestamp,
+                             final boolean isDeleted ) {
+
+        this( sourceNode, type, targetNode, timestamp, isDeleted, false, false );
     }
 
 
     public SimpleMarkedEdge( final Id sourceNode, final String type, final Id targetNode, final long timestamp,
-                             final boolean deleted, final boolean isSourceNodeDeleted,
+                             final boolean isDeleted, final boolean isSourceNodeDeleted,
                              final boolean isTargetNodeDeleted ) {
         super( sourceNode, type, targetNode, timestamp );
-        this.deleted = deleted;
+        this.isDeleted = isDeleted;
         this.isSourceNodeDeleted = isSourceNodeDeleted;
         this.isTargetNodeDeleted = isTargetNodeDeleted;
     }
 
 
-    public SimpleMarkedEdge(final Edge edge, final boolean deleted){
-        this(edge.getSourceNode(), edge.getType(), edge.getTargetNode(), edge.getTimestamp(), deleted);
+    public SimpleMarkedEdge( final Edge edge, final boolean isDeleted ) {
+        this( edge.getSourceNode(), edge.getType(), edge.getTargetNode(), edge.getTimestamp(), isDeleted );
     }
 
 
     @Override
     @JsonIgnore
     public boolean isDeleted() {
-        return deleted;
+        return isDeleted;
     }
 
 
     @Override
+    @JsonIgnore
     public boolean isSourceNodeDelete() {
         return isSourceNodeDeleted;
     }
 
 
     @Override
+    @JsonIgnore
     public boolean isTargetNodeDeleted() {
         return isTargetNodeDeleted;
     }
@@ -92,7 +103,7 @@ public class SimpleMarkedEdge extends  SimpleEdge implements MarkedEdge {
 
         final SimpleMarkedEdge that = ( SimpleMarkedEdge ) o;
 
-        if ( deleted != that.deleted ) {
+        if ( isDeleted != that.isDeleted ) {
             return false;
         }
 
@@ -103,7 +114,7 @@ public class SimpleMarkedEdge extends  SimpleEdge implements MarkedEdge {
     @Override
     public int hashCode() {
         int result = super.hashCode();
-        result = 31 * result + ( deleted ? 1 : 0 );
+        result = 31 * result + ( isDeleted ? 1 : 0 );
         result = 31 * result + ( isSourceNodeDeleted ? 1 : 0 );
         result = 31 * result + ( isTargetNodeDeleted ? 1 : 0 );
         return result;
@@ -113,7 +124,7 @@ public class SimpleMarkedEdge extends  SimpleEdge implements MarkedEdge {
     @Override
     public String toString() {
         return "SimpleMarkedEdge{" +
-            "deleted=" + deleted +
+            "deleted=" + isDeleted +
             ", isSourceNodeDeleted=" + isSourceNodeDeleted +
             ", isTargetNodeDeleted=" + isTargetNodeDeleted +
             "} " + super.toString();

http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java
index eda3a02..9a95c26 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java
@@ -95,7 +95,7 @@ public class GraphManagerIT {
 
         SearchByEdgeType search = createSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTimestamp(), null );
 
-        Observable<Edge> edges = gm.loadEdgesFromSource( search );
+        Observable<MarkedEdge> edges = gm.loadEdgesFromSource( search );
 
         //implicitly blows up if more than 1 is returned from "single"
         Edge returned = edges.toBlocking().last();
@@ -127,7 +127,7 @@ public class GraphManagerIT {
 
         SearchByEdgeType search = createSearchByEdge( edge.getTargetNode(), edge.getType(), edge.getTimestamp(), null );
 
-        Observable<Edge> edges = gm.loadEdgesToTarget( search );
+        Observable<MarkedEdge> edges = gm.loadEdgesToTarget( search );
 
         //implicitly blows up if more than 1 is returned from "single"
         Edge returned = edges.toBlocking().single();
@@ -161,7 +161,7 @@ public class GraphManagerIT {
 
         SearchByEdgeType search = createSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTimestamp(), null );
 
-        Observable<Edge> edges = gm.loadEdgesFromSource( search );
+        Observable<MarkedEdge> edges = gm.loadEdgesFromSource( search );
 
         //implicitly blows up if more than 1 is returned from "single"
         Edge returned = edges.toBlocking().single();
@@ -196,7 +196,7 @@ public class GraphManagerIT {
 
         SearchByEdgeType search = createSearchByEdge( edge.getTargetNode(), edge.getType(), edge.getTimestamp(), null );
 
-        Observable<Edge> edges = gm.loadEdgesToTarget( search );
+        Observable<MarkedEdge> edges = gm.loadEdgesToTarget( search );
 
         //implicitly blows up if more than 1 is returned from "single"
         Edge returned = edges.toBlocking().single();
@@ -248,10 +248,10 @@ public class GraphManagerIT {
         SearchByEdgeType search =
             createSearchByEdge( edge1.getSourceNode(), edge1.getType(), edge3.getTimestamp(), null );
 
-        Observable<Edge> edges = gm.loadEdgesFromSource( search );
+        Observable<MarkedEdge> edges = gm.loadEdgesFromSource( search );
 
         //implicitly blows up if more than 1 is returned from "single"
-        Iterator<Edge> returned = edges.toBlocking().getIterator();
+        Iterator<MarkedEdge> returned = edges.toBlocking().getIterator();
 
         assertEquals( "Correct edge returned", edge3, returned.next() );
         assertEquals( "Correct edge returned", edge2, returned.next() );
@@ -321,10 +321,10 @@ public class GraphManagerIT {
         SearchByEdgeType search =
             createSearchByEdge( edge1.getTargetNode(), edge1.getType(), edge3.getTimestamp(), null );
 
-        Observable<Edge> edges = gm.loadEdgesToTarget( search );
+        Observable<MarkedEdge> edges = gm.loadEdgesToTarget( search );
 
         //implicitly blows up if more than 1 is returned from "single"
-        Iterator<Edge> returned = edges.toBlocking().getIterator();
+        Iterator<MarkedEdge> returned = edges.toBlocking().getIterator();
 
         assertEquals( "Correct edge returned", edge3, returned.next() );
         assertEquals( "Correct edge returned", edge2, returned.next() );
@@ -387,10 +387,10 @@ public class GraphManagerIT {
         SearchByEdgeType search =
             createSearchByEdge( edge1.getSourceNode(), edge1.getType(), edge3.getTimestamp(), null );
 
-        Observable<Edge> edges = gm.loadEdgesFromSource( search );
+        Observable<MarkedEdge> edges = gm.loadEdgesFromSource( search );
 
         //implicitly blows up if more than 1 is returned from "single"
-        Iterator<Edge> returned = edges.toBlocking().getIterator();
+        Iterator<MarkedEdge> returned = edges.toBlocking().getIterator();
 
 
         //we have 3 edges, but we specified our first edge as the max, we shouldn't get any more results than the first
@@ -443,10 +443,10 @@ public class GraphManagerIT {
         SearchByEdgeType search =
             createSearchByEdge( edge1.getTargetNode(), edge1.getType(), edge3.getTimestamp(), null );
 
-        Observable<Edge> edges = gm.loadEdgesToTarget( search );
+        Observable<MarkedEdge> edges = gm.loadEdgesToTarget( search );
 
         //implicitly blows up if more than 1 is returned from "single"
-        Iterator<Edge> returned = edges.toBlocking().getIterator();
+        Iterator<MarkedEdge> returned = edges.toBlocking().getIterator();
 
 
         //we have 3 edges, but we specified our first edge as the max, we shouldn't get any more results than the first
@@ -487,7 +487,7 @@ public class GraphManagerIT {
         SearchByIdType search = createSearchByEdgeAndId( edge.getSourceNode(), edge.getType(), edge.getTimestamp(),
             edge.getTargetNode().getType(), null );
 
-        Observable<Edge> edges = gm.loadEdgesFromSourceByType( search );
+        Observable<MarkedEdge> edges = gm.loadEdgesFromSourceByType( search );
 
         //implicitly blows up if more than 1 is returned from "single"
         Edge returned = edges.toBlocking().single();
@@ -524,7 +524,7 @@ public class GraphManagerIT {
         SearchByIdType search = createSearchByEdgeAndId( edge.getTargetNode(), edge.getType(), edge.getTimestamp(),
             edge.getSourceNode().getType(), null );
 
-        Observable<Edge> edges = gm.loadEdgesToTargetByType( search );
+        Observable<MarkedEdge> edges = gm.loadEdgesToTargetByType( search );
 
         //implicitly blows up if more than 1 is returned from "single"
         Edge returned = edges.toBlocking().single();
@@ -560,7 +560,7 @@ public class GraphManagerIT {
 
         SearchByEdgeType search = createSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTimestamp(), null );
 
-        Observable<Edge> edges = gm.loadEdgesFromSource( search );
+        Observable<MarkedEdge> edges = gm.loadEdgesFromSource( search );
 
         //implicitly blows up if more than 1 is returned from "single"
         Edge returned = edges.toBlocking().single();
@@ -629,7 +629,7 @@ public class GraphManagerIT {
 
         SearchByEdgeType search = createSearchByEdge( edge.getTargetNode(), edge.getType(), edge.getTimestamp(), null );
 
-        Observable<Edge> edges = gm.loadEdgesToTarget( search );
+        Observable<MarkedEdge> edges = gm.loadEdgesToTarget( search );
 
         //implicitly blows up if more than 1 is returned from "single"
         Edge returned = edges.toBlocking().single();
@@ -988,11 +988,11 @@ public class GraphManagerIT {
 
 
         //get our 2 edges
-        Observable<Edge> edges =
+        Observable<MarkedEdge> edges =
             gm.loadEdgesFromSource( createSearchByEdge( edge1.getSourceNode(), edge1.getType(), maxVersion, null ) );
 
 
-        Iterator<Edge> results = edges.toBlocking().getIterator();
+        Iterator<MarkedEdge> results = edges.toBlocking().getIterator();
 
 
         assertEquals( "Edges correct", edge2, results.next() );
@@ -1060,11 +1060,11 @@ public class GraphManagerIT {
 
 
         //get our 2 edges
-        Observable<Edge> edges =
+        Observable<MarkedEdge> edges =
             gm.loadEdgesToTarget( createSearchByEdge( edge1.getTargetNode(), edge1.getType(), maxVersion, null ) );
 
 
-        Iterator<Edge> results = edges.toBlocking().getIterator();
+        Iterator<MarkedEdge> results = edges.toBlocking().getIterator();
 
 
         assertEquals( "Edges correct", edge2, results.next() );
@@ -1140,11 +1140,11 @@ public class GraphManagerIT {
 
 
         //get our 2 edges
-        Observable<Edge> edges = gm.loadEdgesFromSource(
+        Observable<MarkedEdge> edges = gm.loadEdgesFromSource(
             createSearchByEdgeUnfiltered( edge1.getSourceNode(), edge1.getType(), maxVersion, null ) );
 
 
-        Iterator<Edge> results = edges.toBlocking().getIterator();
+        Iterator<MarkedEdge> results = edges.toBlocking().getIterator();
 
 
         assertEquals( "Edges correct", edge2.getTargetNode(), results.next().getTargetNode() );
@@ -1211,11 +1211,11 @@ public class GraphManagerIT {
         gm.markEdge( edge1 ).toBlocking().last();
 
         //get our 2 edges
-        Observable<Edge> edges = gm.loadEdgesToTarget(
+        Observable<MarkedEdge> edges = gm.loadEdgesToTarget(
             createSearchByEdgeUnfiltered( edge1.getTargetNode(), edge1.getType(), maxVersion, null ) );
 
 
-        Iterator<Edge> results = edges.toBlocking().getIterator();
+        Iterator<MarkedEdge> results = edges.toBlocking().getIterator();
 
 
         assertEquals( "Edges correct", edge2.getSourceNode(), results.next().getSourceNode() );
@@ -1282,11 +1282,11 @@ public class GraphManagerIT {
 
 
         //get our 2 edges
-        Observable<Edge> edges = gm.loadEdgesFromSourceByType(
+        Observable<MarkedEdge> edges = gm.loadEdgesFromSourceByType(
             createSearchByEdgeAndId( sourceId, edge1.getType(), maxVersion, targetId1.getType(), null ) );
 
 
-        Iterator<Edge> results = edges.toBlocking().getIterator();
+        Iterator<MarkedEdge> results = edges.toBlocking().getIterator();
 
 
         assertEquals( "Edges correct", edge1, results.next() );
@@ -1359,11 +1359,11 @@ public class GraphManagerIT {
         final long maxVersion = System.currentTimeMillis();
 
         //get our 2 edges
-        Observable<Edge> edges = gm.loadEdgesToTargetByType(
+        Observable<MarkedEdge> edges = gm.loadEdgesToTargetByType(
             createSearchByEdgeAndId( targetId, edge1.getType(), maxVersion, sourceId1.getType(), null ) );
 
 
-        Iterator<Edge> results = edges.toBlocking().getIterator();
+        Iterator<MarkedEdge> results = edges.toBlocking().getIterator();
 
 
         assertEquals( "Edges correct", edge1, results.next() );
@@ -1435,7 +1435,7 @@ public class GraphManagerIT {
 
         final long maxVersion = System.currentTimeMillis();
 
-        Iterator<Edge> results =
+        Iterator<MarkedEdge> results =
             gm.loadEdgesFromSource( createSearchByEdge( sourceId, edge1.getType(), maxVersion, null ) ).toBlocking()
               .getIterator();
 
@@ -1547,7 +1547,7 @@ public class GraphManagerIT {
 
         final long maxVersion = System.currentTimeMillis();
 
-        Iterator<Edge> results =
+        Iterator<MarkedEdge> results =
             gm.loadEdgesFromSource( createSearchByEdge( sourceId, edge1.getType(), maxVersion, null ) ).toBlocking()
               .getIterator();
 
@@ -1627,7 +1627,7 @@ public class GraphManagerIT {
 
         final long maxVersion = System.currentTimeMillis();
 
-        Iterator<Edge> results =
+        Iterator<MarkedEdge> results =
             gm.loadEdgesToTarget( createSearchByEdge( targetId, edge1.getType(), maxVersion, null ) ).toBlocking()
               .getIterator();
 
@@ -1764,7 +1764,7 @@ public class GraphManagerIT {
 
         final long maxVersion = System.currentTimeMillis();
 
-        Iterator<Edge> results =
+        Iterator<MarkedEdge> results =
             gm.loadEdgesToTarget( createSearchByEdge( targetId, edge1.getType(), maxVersion, null ) ).toBlocking()
               .getIterator();
 
@@ -1842,7 +1842,7 @@ public class GraphManagerIT {
 
         final long maxVersion = System.currentTimeMillis();
 
-        Iterator<Edge> results =
+        Iterator<MarkedEdge> results =
             gm.loadEdgesFromSource( createSearchByEdge( sourceId, edge1.getType(), maxVersion, null ) ).toBlocking()
               .getIterator();
 
@@ -1983,7 +1983,7 @@ public class GraphManagerIT {
 
         final long maxVersion = System.currentTimeMillis();
 
-        Iterator<Edge> results =
+        Iterator<MarkedEdge> results =
             gm.loadEdgesToTarget( createSearchByEdge( targetId, edge1.getType(), maxVersion, null ) ).toBlocking()
               .getIterator();
 
@@ -2356,10 +2356,10 @@ public class GraphManagerIT {
             new SimpleSearchByEdge( edge1.getSourceNode(), edge1.getType(), edge1.getTargetNode(), Long.MAX_VALUE,
                 SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent()   );
 
-        final Observable<Edge> edgesDescending = gm.loadEdgeVersions( searchDescending );
+        final Observable<MarkedEdge> edgesDescending = gm.loadEdgeVersions( searchDescending );
 
         //search descending
-        final List<Edge> descending = edgesDescending.toList().toBlocking().single();
+        final List<MarkedEdge> descending = edgesDescending.toList().toBlocking().single();
 
         assertEquals( "Correct size returned", 3, descending.size() );
 
@@ -2376,9 +2376,9 @@ public class GraphManagerIT {
                     new SimpleSearchByEdge( edge1.getSourceNode(), edge1.getType(), edge1.getTargetNode(), 0,
                         SearchByEdgeType.Order.ASCENDING, Optional.<Edge>absent()   );
 
-        Observable<Edge> edgesAscending = gm.loadEdgeVersions( searchAscending );
+        Observable<MarkedEdge> edgesAscending = gm.loadEdgeVersions( searchAscending );
 
-        List<Edge> ascending = edgesAscending.toList().toBlocking().single();
+        List<MarkedEdge> ascending = edgesAscending.toList().toBlocking().single();
 
         assertEquals( "Correct size returned", 3, ascending.size() );
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerLoadTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerLoadTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerLoadTest.java
index b922e7c..22683f6 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerLoadTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerLoadTest.java
@@ -113,7 +113,7 @@ public class GraphManagerLoadTest {
 
 
             @Override
-            public Observable<Edge> doSearch( final GraphManager manager ) {
+            public Observable<MarkedEdge> doSearch( final GraphManager manager ) {
                  return manager.loadEdgesFromSource( new SimpleSearchByEdgeType( sourceId, "test", System.currentTimeMillis(), SearchByEdgeType.Order.DESCENDING,  Optional
                                       .<Edge>absent()) );
             }
@@ -141,7 +141,7 @@ public class GraphManagerLoadTest {
 
 
             @Override
-            public Observable<Edge> doSearch( final GraphManager manager ) {
+            public Observable<MarkedEdge> doSearch( final GraphManager manager ) {
                 return manager.loadEdgesToTarget( new SimpleSearchByEdgeType( targetId, "test", System.currentTimeMillis(), SearchByEdgeType.Order.DESCENDING,  Optional.<Edge>absent() ) );
             }
         };
@@ -220,7 +220,7 @@ public class GraphManagerLoadTest {
             final CountDownLatch latch = new CountDownLatch( 1 );
 
 
-            generator.doSearch( manager ).take( readCount ).buffer( 1000 ).subscribe( new Subscriber<List<Edge>>() {
+            generator.doSearch( manager ).take( readCount ).buffer( 1000 ).subscribe( new Subscriber<List<MarkedEdge>>() {
                 @Override
                 public void onCompleted() {
                     timer.stop();
@@ -235,7 +235,7 @@ public class GraphManagerLoadTest {
 
 
                 @Override
-                public void onNext( final List<Edge> edges ) {
+                public void onNext( final List<MarkedEdge> edges ) {
                     log.info("Read {} edges", edges.size());
                 }
             } );
@@ -263,6 +263,6 @@ public class GraphManagerLoadTest {
          * @param manager
          * @return
          */
-        public Observable<Edge> doSearch( final GraphManager manager );
+        public Observable<MarkedEdge> doSearch( final GraphManager manager );
     }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
index c1917bb..6aad289 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
@@ -176,7 +176,7 @@ public class GraphManagerShardConsistencyIT {
 
 
             @Override
-            public Observable<Edge> doSearch( final GraphManager manager ) {
+            public Observable<MarkedEdge> doSearch( final GraphManager manager ) {
                 return manager.loadEdgesFromSource(
                     new SimpleSearchByEdgeType( sourceId, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
                         Optional.<Edge>absent() ) );
@@ -409,7 +409,7 @@ public class GraphManagerShardConsistencyIT {
 
 
             @Override
-            public Observable<Edge> doSearch( final GraphManager manager ) {
+            public Observable<MarkedEdge> doSearch( final GraphManager manager ) {
                 return manager.loadEdgesFromSource(
                     new SimpleSearchByEdgeType( sourceId, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
                         Optional.<Edge>absent(), false ) );
@@ -729,7 +729,7 @@ public class GraphManagerShardConsistencyIT {
         /**
          * Perform the search returning an observable edge
          */
-        public Observable<Edge> doSearch( final GraphManager manager );
+        public Observable<MarkedEdge> doSearch( final GraphManager manager );
     }
 }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerStressTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerStressTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerStressTest.java
index 2889684..6a2efc9 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerStressTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerStressTest.java
@@ -107,25 +107,25 @@ public class GraphManagerStressTest {
 
 
             @Override
-            public Observable<Edge> doSearch( final GraphManager manager ) {
+            public Observable<MarkedEdge> doSearch( final GraphManager manager ) {
 
 
                 final long timestamp = System.currentTimeMillis();
 
 
-                return Observable.create( new Observable.OnSubscribe<Edge>() {
+                return Observable.create( new Observable.OnSubscribe<MarkedEdge>() {
 
                     @Override
-                    public void call( final Subscriber<? super Edge> subscriber ) {
+                    public void call( final Subscriber<? super MarkedEdge> subscriber ) {
                         try {
                             for ( Id sourceId : sourceIds ) {
 
-                                final Iterable<Edge> edges = manager.loadEdgesFromSource(
+                                final Iterable<MarkedEdge> edges = manager.loadEdgesFromSource(
                                         new SimpleSearchByEdgeType( sourceId, "test", timestamp, SearchByEdgeType.Order.DESCENDING,  Optional
                                                                                     .<Edge>absent() ) )
                                                                     .toBlocking().toIterable();
 
-                                for ( Edge edge : edges ) {
+                                for ( MarkedEdge edge : edges ) {
                                     log.debug( "Firing on next for edge {}", edge );
 
                                     subscriber.onNext( edge );
@@ -195,7 +195,7 @@ public class GraphManagerStressTest {
 
 
             @Override
-            public Observable<Edge> doSearch( final GraphManager manager ) {
+            public Observable<MarkedEdge> doSearch( final GraphManager manager ) {
                 return manager.loadEdgesFromSource( new SimpleSearchByEdgeType( sourceId, "test", System.currentTimeMillis(), SearchByEdgeType.Order.DESCENDING,  Optional.<Edge>absent() ) );
             }
         };
@@ -222,7 +222,7 @@ public class GraphManagerStressTest {
 
 
             @Override
-            public Observable<Edge> doSearch( final GraphManager manager ) {
+            public Observable<MarkedEdge> doSearch( final GraphManager manager ) {
 
                 return manager.loadEdgesToTarget( new SimpleSearchByEdgeType( targetId, "test", System.currentTimeMillis(), SearchByEdgeType.Order.DESCENDING,  Optional.<Edge>absent() ) );
             }
@@ -307,6 +307,6 @@ public class GraphManagerStressTest {
          */
         public Edge newEdge();
 
-        public Observable<Edge> doSearch( final GraphManager manager );
+        public Observable<MarkedEdge> doSearch( final GraphManager manager );
     }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/services/src/main/java/org/apache/usergrid/management/AppInfoMigrationPlugin.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/AppInfoMigrationPlugin.java b/stack/services/src/main/java/org/apache/usergrid/management/AppInfoMigrationPlugin.java
index d9d3d0d..6e84601 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/AppInfoMigrationPlugin.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/AppInfoMigrationPlugin.java
@@ -47,6 +47,7 @@ import org.apache.usergrid.persistence.exceptions.ApplicationAlreadyExistsExcept
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
 import org.apache.usergrid.persistence.graph.SearchByEdgeType;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
@@ -239,7 +240,7 @@ public class AppInfoMigrationPlugin implements MigrationPlugin {
         final EntityCollectionManager managementCollectionManager =
             entityCollectionManagerFactory.createCollectionManager( managementAppScope );
 
-        Observable<Edge> edgesObservable = getApplicationInfoEdges( appId );
+        Observable<MarkedEdge> edgesObservable = getApplicationInfoEdges( appId );
         //get the graph for all app infos
         Observable<org.apache.usergrid.persistence.model.entity.Entity> entityObs = edgesObservable.flatMap( edge -> {
             final Id appInfoId = edge.getTargetNode();
@@ -299,7 +300,7 @@ public class AppInfoMigrationPlugin implements MigrationPlugin {
     }
 
 
-    public Observable<Edge> getApplicationInfoEdges( final UUID applicationId ) {
+    public Observable<MarkedEdge> getApplicationInfoEdges( final UUID applicationId ) {
         final ApplicationScope managementAppScope = getApplicationScope( CpNamingUtils.MANAGEMENT_APPLICATION_ID );
         final GraphManager gm = graphManagerFactory.createEdgeManager( managementAppScope );