You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/05/10 12:58:26 UTC

incubator-usergrid git commit: Migrated filter to a transform operation

Repository: incubator-usergrid
Updated Branches:
  refs/heads/USERGRID-614 99cb70052 -> 59ad6a989


Migrated filter to a transform operation

Updated tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/59ad6a98
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/59ad6a98
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/59ad6a98

Branch: refs/heads/USERGRID-614
Commit: 59ad6a9897fcdaf31c048a46426873b42915e4b7
Parents: 99cb700
Author: Todd Nine <tn...@apigee.com>
Authored: Sun May 10 04:58:24 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Sun May 10 04:58:24 2015 -0600

----------------------------------------------------------------------
 .../graph/impl/GraphManagerImpl.java            | 144 ++--
 .../graph/impl/SimpleSearchByIdType.java        |   8 +
 .../persistence/graph/GraphManagerIT.java       | 664 ++++++++++++++++---
 .../graph/test/util/EdgeTestUtils.java          |  27 +
 4 files changed, 672 insertions(+), 171 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/59ad6a98/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
index 06cb5a1..e4ce4fd 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
@@ -51,7 +51,6 @@ import org.apache.usergrid.persistence.graph.serialization.util.GraphValidation;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 
-import com.codahale.metrics.Meter;
 import com.codahale.metrics.Timer;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
@@ -60,7 +59,6 @@ import com.netflix.astyanax.MutationBatch;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
 import rx.Observable;
-import rx.functions.Func1;
 
 
 /**
@@ -293,7 +291,7 @@ public class GraphManagerImpl implements GraphManager {
                     return storageEdgeSerialization.getEdgeVersions( scope, searchByEdge );
                 }
             } ).buffer( graphFig.getScanPageSize() )
-                      .flatMap( new EdgeBufferFilter( searchByEdge.getMaxTimestamp(), searchByEdge.filterMarked() ) );
+                      .compose( new EdgeBufferFilter( searchByEdge.getMaxTimestamp(), searchByEdge.filterMarked() ) );
 
         return ObservableTimer.time( edges, loadEdgesVersionsTimer );
     }
@@ -308,7 +306,7 @@ public class GraphManagerImpl implements GraphManager {
                     return storageEdgeSerialization.getEdgesFromSource( scope, search );
                 }
             } ).buffer( graphFig.getScanPageSize() )
-                      .flatMap( new EdgeBufferFilter( search.getMaxTimestamp(), search.filterMarked() ) );
+                      .compose( new EdgeBufferFilter( search.getMaxTimestamp(), search.filterMarked() ) );
 
         return ObservableTimer.time( edges, loadEdgesFromSourceTimer );
     }
@@ -323,7 +321,7 @@ public class GraphManagerImpl implements GraphManager {
                     return storageEdgeSerialization.getEdgesToTarget( scope, search );
                 }
             } ).buffer( graphFig.getScanPageSize() )
-                      .flatMap( new EdgeBufferFilter( search.getMaxTimestamp(), search.filterMarked() ) );
+                      .compose( new EdgeBufferFilter( search.getMaxTimestamp(), search.filterMarked() ) );
 
 
         return ObservableTimer.time( edges, loadEdgesToTargetTimer );
@@ -334,14 +332,14 @@ public class GraphManagerImpl implements GraphManager {
     public Observable<Edge> loadEdgesFromSourceByType( final SearchByIdType search ) {
         final Observable<Edge> edges =
             Observable.create( new ObservableIterator<MarkedEdge>( "getEdgeTypesFromSource" ) {
-                    @Override
-                    protected Iterator<MarkedEdge> getIterator() {
-                        return storageEdgeSerialization.getEdgesFromSourceByTargetType( scope, search );
-                    }
-                } ).buffer( graphFig.getScanPageSize() )
-                      .flatMap( new EdgeBufferFilter( search.getMaxTimestamp(), search.filterMarked() ) );
+                @Override
+                protected Iterator<MarkedEdge> getIterator() {
+                    return storageEdgeSerialization.getEdgesFromSourceByTargetType( scope, search );
+                }
+            } ).buffer( graphFig.getScanPageSize() )
+                      .compose( new EdgeBufferFilter( search.getMaxTimestamp(), search.filterMarked() ) );
 
-        return ObservableTimer.time( edges, loadEdgesFromSourceTimer );
+        return ObservableTimer.time( edges, loadEdgesFromSourceByTypeTimer );
     }
 
 
@@ -349,12 +347,12 @@ public class GraphManagerImpl implements GraphManager {
     public Observable<Edge> loadEdgesToTargetByType( final SearchByIdType search ) {
         final Observable<Edge> edges =
             Observable.create( new ObservableIterator<MarkedEdge>( "getEdgeTypesFromSource" ) {
-                    @Override
-                    protected Iterator<MarkedEdge> getIterator() {
-                        return storageEdgeSerialization.getEdgesToTargetBySourceType( scope, search );
-                    }
-                } ).buffer( graphFig.getScanPageSize() )
-                      .flatMap( new EdgeBufferFilter( search.getMaxTimestamp(), search.filterMarked() ) );
+                @Override
+                protected Iterator<MarkedEdge> getIterator() {
+                    return storageEdgeSerialization.getEdgesToTargetBySourceType( scope, search );
+                }
+            } ).buffer( graphFig.getScanPageSize() )
+                      .compose( new EdgeBufferFilter( search.getMaxTimestamp(), search.filterMarked() ) );
 
         return ObservableTimer.time( edges, loadEdgesToTargetByTypeTimer );
     }
@@ -362,13 +360,13 @@ public class GraphManagerImpl implements GraphManager {
 
     @Override
     public Observable<String> getEdgeTypesFromSource( final SearchEdgeType search ) {
-       final Observable<String> edgeTypes = Observable.create(
-            new ObservableIterator<String>( "getEdgeTypesFromSource" ) {
-                @Override
-                protected Iterator<String> getIterator() {
-                    return edgeMetadataSerialization.getEdgeTypesFromSource( scope, search );
-                }
-            } );
+        final Observable<String> edgeTypes =
+            Observable.create( new ObservableIterator<String>( "getEdgeTypesFromSource" ) {
+                    @Override
+                    protected Iterator<String> getIterator() {
+                        return edgeMetadataSerialization.getEdgeTypesFromSource( scope, search );
+                    }
+                } );
 
         return ObservableTimer.time( edgeTypes, getEdgeTypesFromSourceTimer );
     }
@@ -376,12 +374,13 @@ public class GraphManagerImpl implements GraphManager {
 
     @Override
     public Observable<String> getIdTypesFromSource( final SearchIdType search ) {
-        final Observable<String> edgeTypes =  Observable.create( new ObservableIterator<String>( "getIdTypesFromSource" ) {
-            @Override
-            protected Iterator<String> getIterator() {
-                return edgeMetadataSerialization.getIdTypesFromSource( scope, search );
-            }
-        } );
+        final Observable<String> edgeTypes =
+            Observable.create( new ObservableIterator<String>( "getIdTypesFromSource" ) {
+                @Override
+                protected Iterator<String> getIterator() {
+                    return edgeMetadataSerialization.getIdTypesFromSource( scope, search );
+                }
+            } );
 
         return ObservableTimer.time( edgeTypes, getIdTypesFromSourceTimer );
     }
@@ -389,22 +388,21 @@ public class GraphManagerImpl implements GraphManager {
 
     @Override
     public Observable<String> getEdgeTypesToTarget( final SearchEdgeType search ) {
-        final Observable<String> edgeTypes =   Observable.create(
-            new ObservableIterator<String>( "getEdgeTypesToTarget" ) {
-                @Override
-                protected Iterator<String> getIterator() {
-                    return edgeMetadataSerialization.getEdgeTypesToTarget( scope, search );
-                }
-            } );
+        final Observable<String> edgeTypes =
+            Observable.create( new ObservableIterator<String>( "getEdgeTypesToTarget" ) {
+                    @Override
+                    protected Iterator<String> getIterator() {
+                        return edgeMetadataSerialization.getEdgeTypesToTarget( scope, search );
+                    }
+                } );
 
-        return ObservableTimer.time( edgeTypes, getEdgeTypesFromSourceTimer );
+        return ObservableTimer.time( edgeTypes, getEdgeTypesToTargetTimer );
     }
 
 
     @Override
     public Observable<String> getIdTypesToTarget( final SearchIdType search ) {
-        final Observable<String> edgeTypes =  Observable.create(
-            new ObservableIterator<String>( "getIdTypesToTarget" ) {
+        final Observable<String> edgeTypes = Observable.create( new ObservableIterator<String>( "getIdTypesToTarget" ) {
                 @Override
                 protected Iterator<String> getIterator() {
                     return edgeMetadataSerialization.getIdTypesToTarget( scope, search );
@@ -418,7 +416,9 @@ public class GraphManagerImpl implements GraphManager {
     /**
      * Helper filter to perform mapping and return an observable of pre-filtered edges
      */
-    private class EdgeBufferFilter implements Func1<List<MarkedEdge>, Observable<MarkedEdge>> {
+    private class EdgeBufferFilter implements
+        Observable.Transformer<List<MarkedEdge>, MarkedEdge> {//implements Func1<List<MarkedEdge>,
+        // Observable<MarkedEdge>> {
 
         private final long maxVersion;
         private final boolean filterMarked;
@@ -430,46 +430,60 @@ public class GraphManagerImpl implements GraphManager {
         }
 
 
+        @Override
+
         /**
          * Takes a buffered list of marked edges.  It then does a single round trip to fetch marked ids These are then
          * used in conjunction with the max version filter to filter any edges that should not be returned
          *
          * @return An observable that emits only edges that can be consumed.  There could be multiple versions of the
          * same edge so those need de-duped.
-         */
-        @Override
-        public Observable<MarkedEdge> call( final List<MarkedEdge> markedEdges ) {
+         */ public Observable<MarkedEdge> call( final Observable<List<MarkedEdge>> markedEdgesObservable ) {
 
-            final Map<Id, Long> markedVersions = nodeSerialization.getMaxVersions( scope, markedEdges );
-            final long maxTimestamp = maxVersion;
+            return markedEdgesObservable.flatMap( markedEdges -> {
 
-            return Observable.from( markedEdges ).filter( edge -> {
-                final long edgeTimestamp = edge.getTimestamp();
+                final Observable<MarkedEdge> markedEdgeObservable = Observable.from( markedEdges );
 
-                //our edge needs to not be deleted and have a version that's > max Version
-                if (( edge.isDeleted() && filterMarked) || Long.compare( edgeTimestamp, maxTimestamp ) > 0 ) {
-                    return false;
+                /**
+                 * We aren't going to filter anything, return exactly what we're passed
+                 */
+                if(!filterMarked){
+                    return markedEdgeObservable;
                 }
 
+                //We need to filter, perform that filter
+                final Map<Id, Long> markedVersions = nodeSerialization.getMaxVersions( scope, markedEdges );
 
-                final Long sourceTimestamp = markedVersions.get( edge.getSourceNode() );
+                return markedEdgeObservable.filter( edge -> {
+                    final long edgeTimestamp = edge.getTimestamp();
 
-                //the source Id has been marked for deletion.  It's version is <= to the marked version for deletion,
-                // so we need to discard it
-                if ( sourceTimestamp != null && Long.compare( edgeTimestamp, sourceTimestamp ) < 1 ) {
-                    return false;
-                }
+                    //our edge needs to not be deleted and have a version that's > max Version
+                    if ( edge.isDeleted() || Long.compare( edgeTimestamp, maxVersion ) > 0 ) {
+                        return false;
+                    }
 
-                final Long targetTimestamp = markedVersions.get( edge.getTargetNode() );
 
-                //the target Id has been marked for deletion.  It's version is <= to the marked version for deletion,
-                // so we need to discard it
-                if ( targetTimestamp != null && Long.compare( edgeTimestamp, targetTimestamp ) < 1 ) {
-                    return false;
-                }
+                    final Long sourceTimestamp = markedVersions.get( edge.getSourceNode() );
+
+                    //the source Id has been marked for deletion.  It's version is <= to the marked version for
+                    // deletion,
+                    // so we need to discard it
+                    if ( sourceTimestamp != null && Long.compare( edgeTimestamp, sourceTimestamp ) < 1 ) {
+                        return false;
+                    }
+
+                    final Long targetTimestamp = markedVersions.get( edge.getTargetNode() );
+
+                    //the target Id has been marked for deletion.  It's version is <= to the marked version for
+                    // deletion,
+                    // so we need to discard it
+                    if ( targetTimestamp != null && Long.compare( edgeTimestamp, targetTimestamp ) < 1 ) {
+                        return false;
+                    }
 
 
-                return true;
+                    return true;
+                } );
             } );
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/59ad6a98/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByIdType.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByIdType.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByIdType.java
index 9d989a3..bcbff0a 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByIdType.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByIdType.java
@@ -54,6 +54,14 @@ public class SimpleSearchByIdType extends SimpleSearchByEdgeType implements Sear
     }
 
 
+    public SimpleSearchByIdType( final Id node, final String type, final long maxTimestamp, final Order order, final String idType,
+                                 final Optional<Edge> last, final boolean filterMarked ) {
+        super( node, type, maxTimestamp, order, last, filterMarked );
+        ValidationUtils.verifyString( idType, "idType" );
+        this.idType = idType;
+    }
+
+
     @Override
     public String getIdType() {
         return idType;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/59ad6a98/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java
index 75f61f0..e512608 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java
@@ -43,18 +43,21 @@ import com.google.inject.Inject;
 
 import rx.Observable;
 
+import static org.apache.usergrid.persistence.core.util.IdGenerator.createId;
 import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createEdge;
 import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createGetByEdge;
-import static org.apache.usergrid.persistence.core.util.IdGenerator.createId;
 import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createSearchByEdge;
 import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createSearchByEdgeAndId;
+import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createSearchByEdgeAndIdUnfiltered;
+import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createSearchByEdgeUnfiltered;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
-@RunWith(ITRunner.class)
-@UseModules({ TestGraphModule.class })
+
+@RunWith( ITRunner.class )
+@UseModules( { TestGraphModule.class } )
 public class GraphManagerIT {
 
 
@@ -69,18 +72,16 @@ public class GraphManagerIT {
     protected ApplicationScope scope;
 
 
-
-
     @Before
     public void mockApp() {
-        this.scope = new ApplicationScopeImpl(createId("application")  );
+        this.scope = new ApplicationScopeImpl( createId( "application" ) );
     }
 
 
     @Test
     public void testWriteReadEdgeTypeSource() throws TimeoutException, InterruptedException {
 
-        GraphManager gm =  emf.createEdgeManager( scope );
+        GraphManager gm = emf.createEdgeManager( scope );
 
         Edge edge = createEdge( "source", "test", "target" );
 
@@ -112,7 +113,7 @@ public class GraphManagerIT {
     @Test
     public void testWriteReadEdgeTypeTarget() throws TimeoutException, InterruptedException {
 
-        GraphManager gm =  emf.createEdgeManager( scope );
+        GraphManager gm = emf.createEdgeManager( scope );
 
         Edge edge = createEdge( "source", "test", "target" );
 
@@ -144,7 +145,7 @@ public class GraphManagerIT {
     @Test
     public void testWriteReadEdgeTypeVersionSource() throws TimeoutException, InterruptedException {
 
-        GraphManager gm =  emf.createEdgeManager( scope );
+        GraphManager gm = emf.createEdgeManager( scope );
 
         final long earlyVersion = 1000l;
 
@@ -178,7 +179,7 @@ public class GraphManagerIT {
     @Test
     public void testWriteReadEdgeTypeVersionTarget() throws TimeoutException, InterruptedException {
 
-        GraphManager gm =  emf.createEdgeManager( scope );
+        GraphManager gm = emf.createEdgeManager( scope );
 
         final long earlyVersion = 10000l;
 
@@ -216,7 +217,7 @@ public class GraphManagerIT {
     @Test
     public void testWriteReadEdgeTypeVersionSourceDistinct() throws TimeoutException, InterruptedException {
 
-        GraphManager gm =  emf.createEdgeManager( scope );
+        GraphManager gm = emf.createEdgeManager( scope );
 
         final long earlyVersion = 10000l;
 
@@ -241,7 +242,7 @@ public class GraphManagerIT {
         //now test retrieving it, we should only get edge3, since it's the latest
 
         SearchByEdgeType search =
-                createSearchByEdge( edge1.getSourceNode(), edge1.getType(), edge3.getTimestamp(), null );
+            createSearchByEdge( edge1.getSourceNode(), edge1.getType(), edge3.getTimestamp(), null );
 
         Observable<Edge> edges = gm.loadEdgesFromSource( search );
 
@@ -288,7 +289,7 @@ public class GraphManagerIT {
     public void testWriteReadEdgeTypeVersionTargetDistinct() throws TimeoutException, InterruptedException {
 
 
-        GraphManager gm =  emf.createEdgeManager( scope );
+        GraphManager gm = emf.createEdgeManager( scope );
 
 
         final long earlyVersion = 10000l;
@@ -314,7 +315,7 @@ public class GraphManagerIT {
         //now test retrieving it, we should only get edge3, since it's the latest
 
         SearchByEdgeType search =
-                createSearchByEdge( edge1.getTargetNode(), edge1.getType(), edge3.getTimestamp(), null );
+            createSearchByEdge( edge1.getTargetNode(), edge1.getType(), edge3.getTimestamp(), null );
 
         Observable<Edge> edges = gm.loadEdgesToTarget( search );
 
@@ -360,7 +361,7 @@ public class GraphManagerIT {
     @Test
     public void testWriteReadEdgeTypePagingSource() throws TimeoutException, InterruptedException {
 
-        GraphManager gm =  emf.createEdgeManager( scope );
+        GraphManager gm = emf.createEdgeManager( scope );
         final Id sourceId = IdGenerator.createId( "source" );
 
 
@@ -380,7 +381,7 @@ public class GraphManagerIT {
         //now test retrieving it
 
         SearchByEdgeType search =
-                createSearchByEdge( edge1.getSourceNode(), edge1.getType(), edge3.getTimestamp(), null );
+            createSearchByEdge( edge1.getSourceNode(), edge1.getType(), edge3.getTimestamp(), null );
 
         Observable<Edge> edges = gm.loadEdgesFromSource( search );
 
@@ -415,7 +416,7 @@ public class GraphManagerIT {
     public void testWriteReadEdgeTypePagingTarget() {
 
 
-        GraphManager gm =  emf.createEdgeManager( scope );
+        GraphManager gm = emf.createEdgeManager( scope );
 
 
         final Id targetId = IdGenerator.createId( "target" );
@@ -436,7 +437,7 @@ public class GraphManagerIT {
         //now test retrieving it
 
         SearchByEdgeType search =
-                createSearchByEdge( edge1.getTargetNode(), edge1.getType(), edge3.getTimestamp(), null );
+            createSearchByEdge( edge1.getTargetNode(), edge1.getType(), edge3.getTimestamp(), null );
 
         Observable<Edge> edges = gm.loadEdgesToTarget( search );
 
@@ -470,7 +471,7 @@ public class GraphManagerIT {
     @Test
     public void testWriteReadEdgeTypeTargetTypeSource() {
 
-        GraphManager gm =  emf.createEdgeManager( scope );
+        GraphManager gm = emf.createEdgeManager( scope );
 
 
         Edge edge = createEdge( "source", "test", "target" );
@@ -480,7 +481,7 @@ public class GraphManagerIT {
         //now test retrieving it
 
         SearchByIdType search = createSearchByEdgeAndId( edge.getSourceNode(), edge.getType(), edge.getTimestamp(),
-                edge.getTargetNode().getType(), null );
+            edge.getTargetNode().getType(), null );
 
         Observable<Edge> edges = gm.loadEdgesFromSourceByType( search );
 
@@ -492,7 +493,7 @@ public class GraphManagerIT {
 
         //change edge type to be invalid, shouldn't get a result
         search = createSearchByEdgeAndId( edge.getSourceNode(), edge.getType(), edge.getTimestamp(),
-                edge.getTargetNode().getType() + "invalid", null );
+            edge.getTargetNode().getType() + "invalid", null );
 
         edges = gm.loadEdgesFromSourceByType( search );
 
@@ -506,7 +507,7 @@ public class GraphManagerIT {
     @Test
     public void testWriteReadEdgeTypeTargetTypeTarget() {
 
-        GraphManager gm =  emf.createEdgeManager( scope );
+        GraphManager gm = emf.createEdgeManager( scope );
         ;
 
 
@@ -517,7 +518,7 @@ public class GraphManagerIT {
         //now test retrieving it
 
         SearchByIdType search = createSearchByEdgeAndId( edge.getTargetNode(), edge.getType(), edge.getTimestamp(),
-                edge.getSourceNode().getType(), null );
+            edge.getSourceNode().getType(), null );
 
         Observable<Edge> edges = gm.loadEdgesToTargetByType( search );
 
@@ -529,7 +530,7 @@ public class GraphManagerIT {
 
         //change edge type to be invalid, shouldn't get a result
         search = createSearchByEdgeAndId( edge.getTargetNode(), edge.getType(), edge.getTimestamp(),
-                edge.getSourceNode().getType() + "invalid", null );
+            edge.getSourceNode().getType() + "invalid", null );
 
         edges = gm.loadEdgesToTargetByType( search );
 
@@ -543,7 +544,7 @@ public class GraphManagerIT {
     @Test
     public void testWriteReadEdgeDeleteSource() {
 
-        GraphManager gm =  emf.createEdgeManager( scope );
+        GraphManager gm = emf.createEdgeManager( scope );
 
 
         Edge edge = createEdge( "source", "test", "target" );
@@ -563,7 +564,7 @@ public class GraphManagerIT {
         assertEquals( "Correct edge returned", edge, returned );
 
         SearchByIdType searchById = createSearchByEdgeAndId( edge.getSourceNode(), edge.getType(), edge.getTimestamp(),
-                edge.getTargetNode().getType(), null );
+            edge.getTargetNode().getType(), null );
 
         edges = gm.loadEdgesFromSourceByType( searchById );
 
@@ -573,8 +574,7 @@ public class GraphManagerIT {
         assertEquals( "Correct edge returned", edge, returned );
 
         final SearchByEdge searchByEdge =
-                createGetByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(), edge.getTimestamp(),
-                        null );
+            createGetByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(), edge.getTimestamp(), null );
 
         returned = gm.loadEdgeVersions( searchByEdge ).toBlocking().single();
 
@@ -613,7 +613,7 @@ public class GraphManagerIT {
     @Test
     public void testWriteReadEdgeDeleteTarget() {
 
-        GraphManager gm =  emf.createEdgeManager( scope );
+        GraphManager gm = emf.createEdgeManager( scope );
 
 
         Edge edge = createEdge( "source", "test", "target" );
@@ -633,7 +633,7 @@ public class GraphManagerIT {
         assertEquals( "Correct edge returned", edge, returned );
 
         SearchByIdType searchById = createSearchByEdgeAndId( edge.getTargetNode(), edge.getType(), edge.getTimestamp(),
-                edge.getSourceNode().getType(), null );
+            edge.getSourceNode().getType(), null );
 
         edges = gm.loadEdgesToTargetByType( searchById );
 
@@ -669,7 +669,7 @@ public class GraphManagerIT {
     @Test
     public void testWriteReadEdgeTypesSourceTypes() {
 
-        final GraphManager gm =  emf.createEdgeManager( scope );
+        final GraphManager gm = emf.createEdgeManager( scope );
 
         Id sourceId = new SimpleId( "source" );
         Id targetId1 = new SimpleId( "target" );
@@ -691,7 +691,7 @@ public class GraphManagerIT {
 
         //get our 2 edge types
         Observable<String> edges =
-                gm.getEdgeTypesFromSource( new SimpleSearchEdgeType( testTargetEdge.getSourceNode(), null, null ) );
+            gm.getEdgeTypesFromSource( new SimpleSearchEdgeType( testTargetEdge.getSourceNode(), null, null ) );
 
 
         Iterator<String> results = edges.toBlocking().getIterator();
@@ -718,8 +718,8 @@ public class GraphManagerIT {
         assertFalse( "No results", results.hasNext() );
 
         //now get types for test2
-        edges = gm.getIdTypesFromSource(
-                new SimpleSearchIdType( testTargetEdge.getSourceNode(), "test2", null, null ) );
+        edges =
+            gm.getIdTypesFromSource( new SimpleSearchIdType( testTargetEdge.getSourceNode(), "test2", null, null ) );
 
         results = edges.toBlocking().getIterator();
 
@@ -732,7 +732,7 @@ public class GraphManagerIT {
     @Test
     public void testWriteReadEdgeTypesTargetTypes() {
 
-        final GraphManager gm =  emf.createEdgeManager( scope );
+        final GraphManager gm = emf.createEdgeManager( scope );
 
         Id sourceId1 = new SimpleId( "source" );
         Id sourceId2 = new SimpleId( "source2" );
@@ -797,7 +797,7 @@ public class GraphManagerIT {
     @Test
     public void testWriteReadEdgeTypesSourceTypesPaging() {
 
-        final GraphManager gm =  emf.createEdgeManager( scope );
+        final GraphManager gm = emf.createEdgeManager( scope );
 
         Id sourceId1 = new SimpleId( "source" );
         Id targetId1 = new SimpleId( "target" );
@@ -861,7 +861,7 @@ public class GraphManagerIT {
         //now get the next page
 
         edges = gm.getIdTypesFromSource(
-                new SimpleSearchIdType( testTargetEdge.getSourceNode(), "test", null, targetId1.getType() ) );
+            new SimpleSearchIdType( testTargetEdge.getSourceNode(), "test", null, targetId1.getType() ) );
 
         results = edges.toBlocking().getIterator();
 
@@ -875,7 +875,7 @@ public class GraphManagerIT {
     @Test
     public void testWriteReadEdgeTypesTargetTypesPaging() {
 
-        final GraphManager gm =  emf.createEdgeManager( scope );
+        final GraphManager gm = emf.createEdgeManager( scope );
 
         Id sourceId1 = new SimpleId( "source" );
         Id sourceId2 = new SimpleId( "source2" );
@@ -942,7 +942,7 @@ public class GraphManagerIT {
         //now get the next page
 
         edges = gm.getIdTypesToTarget(
-                new SimpleSearchIdType( testTargetEdge.getTargetNode(), "test", null, sourceId1.getType() ) );
+            new SimpleSearchIdType( testTargetEdge.getTargetNode(), "test", null, sourceId1.getType() ) );
 
         results = edges.toBlocking().getIterator();
 
@@ -956,7 +956,7 @@ public class GraphManagerIT {
     @Test
     public void testMarkSourceEdges() throws InterruptedException {
 
-        final GraphManager gm =  emf.createEdgeManager( scope );
+        final GraphManager gm = emf.createEdgeManager( scope );
 
         Id sourceId = new SimpleId( "source" );
         Id targetId1 = new SimpleId( "target" );
@@ -966,11 +966,11 @@ public class GraphManagerIT {
         long startTime = System.currentTimeMillis();
 
         long edge1Time = startTime;
-        long edge2Time = edge1Time+1;
+        long edge2Time = edge1Time + 1;
 
-        final long maxVersion= edge2Time;
+        final long maxVersion = edge2Time;
 
-        Edge edge1 = createEdge( sourceId, "test", targetId1, edge1Time);
+        Edge edge1 = createEdge( sourceId, "test", targetId1, edge1Time );
 
         gm.writeEdge( edge1 ).toBlocking().singleOrDefault( null );
 
@@ -979,21 +979,18 @@ public class GraphManagerIT {
         gm.writeEdge( edge2 ).toBlocking().singleOrDefault( null );
 
 
-
         assertTrue( Long.compare( maxVersion, edge2.getTimestamp() ) >= 0 );
         assertTrue( Long.compare( maxVersion, edge1.getTimestamp() ) >= 0 );
 
 
         //get our 2 edges
-        Observable<Edge> edges = gm.loadEdgesFromSource(
-                createSearchByEdge( edge1.getSourceNode(), edge1.getType(), maxVersion, null ) );
+        Observable<Edge> edges =
+            gm.loadEdgesFromSource( createSearchByEdge( edge1.getSourceNode(), edge1.getType(), maxVersion, null ) );
 
 
         Iterator<Edge> results = edges.toBlocking().getIterator();
 
 
-        System.out.println( "\n\n\n\n\n\n\n\n\n\n" );
-
         assertEquals( "Edges correct", edge2, results.next() );
 
         assertEquals( "Edges correct", edge1, results.next() );
@@ -1002,15 +999,12 @@ public class GraphManagerIT {
 
         //now delete one of the edges
 
-        System.out.println( "\n\n\n\n\n\n\n\n\n\n" );
 
         gm.markEdge( edge1 ).toBlocking().last();
 
-        System.out.println( "\n\n\n\n\n\n\n\n\n\n" );
 
-
-        edges = gm.loadEdgesFromSource(
-                createSearchByEdge( edge1.getSourceNode(), edge1.getType(), maxVersion, null ) );
+        edges =
+            gm.loadEdgesFromSource( createSearchByEdge( edge1.getSourceNode(), edge1.getType(), maxVersion, null ) );
 
 
         results = edges.toBlocking().getIterator();
@@ -1020,16 +1014,14 @@ public class GraphManagerIT {
 
         assertFalse( "No more edges", results.hasNext() );
 
-        System.out.println( "\n\n\n\n\n\n\n\n\n\n" );
 
         //now delete one of the edges
 
         gm.markEdge( edge2 ).toBlocking().last();
 
-        System.out.println( "\n\n\n\n\n\n\n\n\n\n" );
 
-        edges = gm.loadEdgesFromSource(
-                createSearchByEdge( edge1.getSourceNode(), edge1.getType(), maxVersion, null ) );
+        edges =
+            gm.loadEdgesFromSource( createSearchByEdge( edge1.getSourceNode(), edge1.getType(), maxVersion, null ) );
 
 
         results = edges.toBlocking().getIterator();
@@ -1045,7 +1037,7 @@ public class GraphManagerIT {
     @Test
     public void testMarkTargetEdges() {
 
-        final GraphManager gm =  emf.createEdgeManager( scope );
+        final GraphManager gm = emf.createEdgeManager( scope );
 
         Id sourceId1 = new SimpleId( "source" );
         Id sourceId2 = new SimpleId( "source2" );
@@ -1065,7 +1057,7 @@ public class GraphManagerIT {
 
         //get our 2 edges
         Observable<Edge> edges =
-                gm.loadEdgesToTarget( createSearchByEdge( edge1.getTargetNode(), edge1.getType(), maxVersion, null ) );
+            gm.loadEdgesToTarget( createSearchByEdge( edge1.getTargetNode(), edge1.getType(), maxVersion, null ) );
 
 
         Iterator<Edge> results = edges.toBlocking().getIterator();
@@ -1111,9 +1103,163 @@ public class GraphManagerIT {
 
 
     @Test
+    public void testMarkCompactSourceEdges() throws InterruptedException {
+
+        final GraphManager gm = emf.createEdgeManager( scope );
+
+        Id sourceId = new SimpleId( "source" );
+        Id targetId1 = new SimpleId( "target" );
+        Id targetId2 = new SimpleId( "target2" );
+
+
+        long startTime = System.currentTimeMillis();
+
+        long edge1Time = startTime;
+        long edge2Time = edge1Time + 1;
+
+        final long maxVersion = edge2Time;
+
+        Edge edge1 = createEdge( sourceId, "test", targetId1, edge1Time );
+
+        gm.writeEdge( edge1 ).toBlocking().singleOrDefault( null );
+
+        Edge edge2 = createEdge( sourceId, "test", targetId2, edge2Time );
+
+        gm.writeEdge( edge2 ).toBlocking().singleOrDefault( null );
+
+
+        assertTrue( Long.compare( maxVersion, edge2.getTimestamp() ) >= 0 );
+        assertTrue( Long.compare( maxVersion, edge1.getTimestamp() ) >= 0 );
+
+
+        gm.markEdge( edge1 ).toBlocking().last();
+
+
+        //get our 2 edges
+        Observable<Edge> edges = gm.loadEdgesFromSource(
+            createSearchByEdgeUnfiltered( edge1.getSourceNode(), edge1.getType(), maxVersion, null ) );
+
+
+        Iterator<Edge> results = edges.toBlocking().getIterator();
+
+
+        assertEquals( "Edges correct", edge2.getTargetNode(), results.next().getTargetNode() );
+
+        assertEquals( "Edges correct", edge1.getTargetNode(), results.next().getTargetNode() );
+
+        assertFalse( "No more edges", results.hasNext() );
+
+
+        //now delete one of the edges
+
+        gm.deleteEdge( edge1 ).toBlocking().last();
+
+
+        edges = gm.loadEdgesFromSource(
+            createSearchByEdgeUnfiltered( edge1.getSourceNode(), edge1.getType(), maxVersion, null ) );
+
+
+        results = edges.toBlocking().getIterator();
+
+        assertEquals( "Edges correct", edge2.getTargetNode(), results.next().getTargetNode() );
+
+        assertFalse( "No more edges", results.hasNext() );
+
+        //delete an edge we didn't mark, should be a no-op
+        gm.deleteEdge( edge2 ).toBlocking().lastOrDefault( null );
+
+        edges = gm.loadEdgesFromSource(
+                    createSearchByEdgeUnfiltered( edge1.getSourceNode(), edge1.getType(), maxVersion, null ) );
+
+
+        results = edges.toBlocking().getIterator();
+
+
+        assertEquals( "Edges correct", edge2.getSourceNode(), results.next().getSourceNode() );
+
+        assertFalse( "No more edges", results.hasNext() );
+    }
+
+
+    @Test
+    public void testMarkCompactTargetEdges() {
+
+        final GraphManager gm = emf.createEdgeManager( scope );
+
+        Id sourceId1 = new SimpleId( "source" );
+        Id sourceId2 = new SimpleId( "source2" );
+        Id targetId = new SimpleId( "target" );
+
+        Edge edge1 = createEdge( sourceId1, "test", targetId, System.currentTimeMillis() );
+
+        gm.writeEdge( edge1 ).toBlocking().last();
+
+        Edge edge2 = createEdge( sourceId2, "test", targetId, System.currentTimeMillis() );
+
+        gm.writeEdge( edge2 ).toBlocking().last();
+
+
+        final long maxVersion = System.currentTimeMillis();
+
+
+        //now delete one of the edges
+
+        gm.markEdge( edge1 ).toBlocking().last();
+
+        //get our 2 edges
+        Observable<Edge> edges = gm.loadEdgesToTarget(
+            createSearchByEdgeUnfiltered( edge1.getTargetNode(), edge1.getType(), maxVersion, null ) );
+
+
+        Iterator<Edge> results = edges.toBlocking().getIterator();
+
+
+        assertEquals( "Edges correct", edge2.getSourceNode(), results.next().getSourceNode() );
+
+        assertEquals( "Edges correct", edge1.getSourceNode(), results.next().getSourceNode() );
+
+
+        assertFalse( "No more edges", results.hasNext() );
+
+
+        //now delete one of the edges
+
+        gm.deleteEdge( edge1 ).toBlocking().last();
+
+        edges = gm.loadEdgesToTarget(
+            createSearchByEdgeUnfiltered( edge1.getTargetNode(), edge1.getType(), maxVersion, null ) );
+
+
+        results = edges.toBlocking().getIterator();
+
+
+        assertEquals( "Edges correct", edge2.getSourceNode(), results.next().getSourceNode() );
+
+        assertFalse( "No more edges", results.hasNext() );
+
+        //now delete one of the edges
+
+
+        //delete an edge we didn't mark, should be a no-op
+        gm.deleteEdge( edge2 ).toBlocking().lastOrDefault( null );
+
+        edges = gm.loadEdgesToTarget(
+            createSearchByEdgeUnfiltered( edge1.getTargetNode(), edge1.getType(), maxVersion, null ) );
+
+
+        results = edges.toBlocking().getIterator();
+
+
+        assertEquals( "Edges correct", edge2.getSourceNode(), results.next().getSourceNode() );
+
+        assertFalse( "No more edges", results.hasNext() );
+    }
+
+
+    @Test
     public void testMarkSourceEdgesType() {
 
-        final GraphManager gm =  emf.createEdgeManager( scope );
+        final GraphManager gm = emf.createEdgeManager( scope );
 
         Id sourceId = new SimpleId( "source" );
         Id targetId1 = new SimpleId( "target" );
@@ -1133,7 +1279,7 @@ public class GraphManagerIT {
 
         //get our 2 edges
         Observable<Edge> edges = gm.loadEdgesFromSourceByType(
-                createSearchByEdgeAndId( sourceId, edge1.getType(), maxVersion, targetId1.getType(), null ) );
+            createSearchByEdgeAndId( sourceId, edge1.getType(), maxVersion, targetId1.getType(), null ) );
 
 
         Iterator<Edge> results = edges.toBlocking().getIterator();
@@ -1149,7 +1295,7 @@ public class GraphManagerIT {
 
 
         edges = gm.loadEdgesFromSourceByType(
-                createSearchByEdgeAndId( sourceId, edge1.getType(), maxVersion, targetId1.getType(), null ) );
+            createSearchByEdgeAndId( sourceId, edge1.getType(), maxVersion, targetId1.getType(), null ) );
 
         results = edges.toBlocking().getIterator();
 
@@ -1158,7 +1304,7 @@ public class GraphManagerIT {
 
 
         edges = gm.loadEdgesFromSourceByType(
-                createSearchByEdgeAndId( sourceId, edge1.getType(), maxVersion, targetId2.getType(), null ) );
+            createSearchByEdgeAndId( sourceId, edge1.getType(), maxVersion, targetId2.getType(), null ) );
 
 
         results = edges.toBlocking().getIterator();
@@ -1174,7 +1320,7 @@ public class GraphManagerIT {
 
 
         edges = gm.loadEdgesFromSourceByType(
-                createSearchByEdgeAndId( sourceId, edge1.getType(), maxVersion, targetId2.getType(), null ) );
+            createSearchByEdgeAndId( sourceId, edge1.getType(), maxVersion, targetId2.getType(), null ) );
 
 
         results = edges.toBlocking().getIterator();
@@ -1191,7 +1337,7 @@ public class GraphManagerIT {
     @Test
     public void testMarkTargetEdgesType() {
 
-        final GraphManager gm =  emf.createEdgeManager( scope );
+        final GraphManager gm = emf.createEdgeManager( scope );
 
         Id sourceId1 = new SimpleId( "source" );
         Id sourceId2 = new SimpleId( "source2" );
@@ -1210,7 +1356,7 @@ public class GraphManagerIT {
 
         //get our 2 edges
         Observable<Edge> edges = gm.loadEdgesToTargetByType(
-                createSearchByEdgeAndId( targetId, edge1.getType(), maxVersion, sourceId1.getType(), null ) );
+            createSearchByEdgeAndId( targetId, edge1.getType(), maxVersion, sourceId1.getType(), null ) );
 
 
         Iterator<Edge> results = edges.toBlocking().getIterator();
@@ -1226,8 +1372,7 @@ public class GraphManagerIT {
 
 
         edges = gm.loadEdgesToTargetByType(
-                createSearchByEdgeAndId( edge1.getSourceNode(), edge1.getType(), maxVersion, sourceId1.getType(),
-                        null ) );
+            createSearchByEdgeAndId( edge1.getSourceNode(), edge1.getType(), maxVersion, sourceId1.getType(), null ) );
 
         results = edges.toBlocking().getIterator();
 
@@ -1236,7 +1381,7 @@ public class GraphManagerIT {
 
 
         edges = gm.loadEdgesToTargetByType(
-                createSearchByEdgeAndId( targetId, edge1.getType(), maxVersion, sourceId2.getType(), null ) );
+            createSearchByEdgeAndId( targetId, edge1.getType(), maxVersion, sourceId2.getType(), null ) );
 
 
         results = edges.toBlocking().getIterator();
@@ -1252,7 +1397,7 @@ public class GraphManagerIT {
 
 
         edges = gm.loadEdgesToTargetByType(
-                createSearchByEdgeAndId( targetId, edge1.getType(), maxVersion, sourceId2.getType(), null ) );
+            createSearchByEdgeAndId( targetId, edge1.getType(), maxVersion, sourceId2.getType(), null ) );
 
 
         results = edges.toBlocking().getIterator();
@@ -1269,7 +1414,7 @@ public class GraphManagerIT {
     @Test
     public void markSourceNode() {
 
-        final GraphManager gm =  emf.createEdgeManager( scope );
+        final GraphManager gm = emf.createEdgeManager( scope );
 
         Id sourceId = new SimpleId( "source" );
         Id targetId1 = new SimpleId( "target" );
@@ -1287,8 +1432,8 @@ public class GraphManagerIT {
         final long maxVersion = System.currentTimeMillis();
 
         Iterator<Edge> results =
-                gm.loadEdgesFromSource( createSearchByEdge( sourceId, edge1.getType(), maxVersion, null ) )
-                  .toBlocking().getIterator();
+            gm.loadEdgesFromSource( createSearchByEdge( sourceId, edge1.getType(), maxVersion, null ) ).toBlocking()
+              .getIterator();
 
 
         assertEquals( "Edge found", edge2, results.next() );
@@ -1300,8 +1445,8 @@ public class GraphManagerIT {
 
         //get our 2 edges
         results = gm.loadEdgesFromSourceByType(
-                createSearchByEdgeAndId( sourceId, edge1.getType(), maxVersion, targetId1.getType(), null ) )
-                    .toBlocking().getIterator();
+            createSearchByEdgeAndId( sourceId, edge1.getType(), maxVersion, targetId1.getType(), null ) ).toBlocking()
+                    .getIterator();
 
 
         assertEquals( "Edges correct", edge1, results.next() );
@@ -1310,8 +1455,8 @@ public class GraphManagerIT {
 
         //now delete one of the edges
         results = gm.loadEdgesFromSourceByType(
-                createSearchByEdgeAndId( sourceId, edge2.getType(), maxVersion, targetId2.getType(), null ) )
-                    .toBlocking().getIterator();
+            createSearchByEdgeAndId( sourceId, edge2.getType(), maxVersion, targetId2.getType(), null ) ).toBlocking()
+                    .getIterator();
 
 
         assertEquals( "Edges correct", edge2, results.next() );
@@ -1324,26 +1469,61 @@ public class GraphManagerIT {
 
         //now re-read, nothing should be there since they're marked
 
-        results = gm.loadEdgesFromSource( createSearchByEdge( sourceId, edge1.getType(), maxVersion, null ) )
-                    .toBlocking().getIterator();
+        results =
+            gm.loadEdgesFromSource( createSearchByEdge( sourceId, edge1.getType(), maxVersion, null ) ).toBlocking()
+              .getIterator();
 
         assertFalse( "No more edges", results.hasNext() );
 
 
         //get our 2 edges
         results = gm.loadEdgesFromSourceByType(
-                createSearchByEdgeAndId( sourceId, edge1.getType(), maxVersion, targetId1.getType(), null ) )
-                    .toBlocking().getIterator();
+            createSearchByEdgeAndId( sourceId, edge1.getType(), maxVersion, targetId1.getType(), null ) ).toBlocking()
+                    .getIterator();
 
 
         assertFalse( "No more edges", results.hasNext() );
 
+
         //now delete one of the edges
         results = gm.loadEdgesFromSourceByType(
-                createSearchByEdgeAndId( sourceId, edge2.getType(), maxVersion, targetId2.getType(), null ) )
+            createSearchByEdgeAndId( sourceId, edge2.getType(), maxVersion, targetId2.getType(), null ) ).toBlocking()
+                    .getIterator();
+
+
+        assertFalse( "No more edges", results.hasNext() );
+
+
+        //test they come back unfiltered
+
+        //read with filter marked off to ensure they're still persisted
+        results = gm.loadEdgesFromSource( createSearchByEdgeUnfiltered( sourceId, edge1.getType(), maxVersion, null ) )
+                    .toBlocking().getIterator();
+
+
+        assertEquals( "Edge found", edge2, results.next() );
+
+        assertEquals( "Edge found", edge1, results.next() );
+
+        assertFalse( "No more edges", results.hasNext() );
+
+
+        results = gm.loadEdgesFromSourceByType(
+            createSearchByEdgeAndIdUnfiltered( sourceId, edge1.getType(), maxVersion, targetId1.getType(), null ) )
                     .toBlocking().getIterator();
 
 
+        assertEquals( "Edges correct", edge1, results.next() );
+
+        assertFalse( "No more edges", results.hasNext() );
+
+        results = gm.loadEdgesFromSourceByType(
+            createSearchByEdgeAndIdUnfiltered( sourceId, edge2.getType(), maxVersion, targetId2.getType(), null ) )
+                    .toBlocking().getIterator();
+
+
+        assertEquals( "Edges correct", edge2, results.next() );
+
         assertFalse( "No more edges", results.hasNext() );
     }
 
@@ -1351,7 +1531,7 @@ public class GraphManagerIT {
     @Test
     public void markTargetNode() {
 
-        final GraphManager gm =  emf.createEdgeManager( scope );
+        final GraphManager gm = emf.createEdgeManager( scope );
 
         Id sourceId1 = new SimpleId( "source" );
         Id sourceId2 = new SimpleId( "source2" );
@@ -1369,8 +1549,8 @@ public class GraphManagerIT {
         final long maxVersion = System.currentTimeMillis();
 
         Iterator<Edge> results =
-                gm.loadEdgesToTarget( createSearchByEdge( targetId, edge1.getType(), maxVersion, null ) )
-                  .toBlocking().getIterator();
+            gm.loadEdgesToTarget( createSearchByEdge( targetId, edge1.getType(), maxVersion, null ) ).toBlocking()
+              .getIterator();
 
 
         assertEquals( "Edge found", edge2, results.next() );
@@ -1382,7 +1562,69 @@ public class GraphManagerIT {
 
         //get our 2 edges
         results = gm.loadEdgesToTargetByType(
-                createSearchByEdgeAndId( targetId, edge1.getType(), maxVersion, sourceId1.getType(), null ) )
+            createSearchByEdgeAndId( targetId, edge1.getType(), maxVersion, sourceId1.getType(), null ) ).toBlocking()
+                    .getIterator();
+
+
+        assertEquals( "Edges correct", edge1, results.next() );
+
+        assertFalse( "No more edges", results.hasNext() );
+
+        //now delete one of the edges
+        results = gm.loadEdgesToTargetByType(
+            createSearchByEdgeAndId( targetId, edge2.getType(), maxVersion, sourceId2.getType(), null ) ).toBlocking()
+                    .getIterator();
+
+
+        assertEquals( "Edges correct", edge2, results.next() );
+
+        assertFalse( "No more edges", results.hasNext() );
+
+        //mark the source node
+        gm.markNode( targetId, edge2.getTimestamp() ).toBlocking().last();
+
+
+        //now re-read, nothing should be there since they're marked
+
+        results = gm.loadEdgesToTarget( createSearchByEdge( targetId, edge1.getType(), maxVersion, null ) ).toBlocking()
+                    .getIterator();
+
+        assertFalse( "No more edges", results.hasNext() );
+
+
+        //get our 2 edges
+        results = gm.loadEdgesToTargetByType(
+            createSearchByEdgeAndId( targetId, edge1.getType(), maxVersion, sourceId1.getType(), null ) ).toBlocking()
+                    .getIterator();
+
+
+        assertFalse( "No more edges", results.hasNext() );
+
+        //now delete one of the edges
+        results = gm.loadEdgesToTargetByType(
+            createSearchByEdgeAndId( targetId, edge2.getType(), maxVersion, sourceId2.getType(), null ) ).toBlocking()
+                    .getIterator();
+
+
+        assertFalse( "No more edges", results.hasNext() );
+
+
+        //now test they come back when unfiltered
+
+        results = gm.loadEdgesToTarget( createSearchByEdgeUnfiltered( targetId, edge1.getType(), maxVersion, null ) )
+                    .toBlocking().getIterator();
+
+
+        assertEquals( "Edge found", edge2, results.next() );
+
+        assertEquals( "Edge found", edge1, results.next() );
+
+        assertFalse( "No more edges", results.hasNext() );
+
+
+        //get our 2 edges
+        results = gm.loadEdgesToTargetByType(
+            createSearchByEdgeAndIdUnfiltered( targetId, edge1.getType(), maxVersion, sourceId1.getType(), null ) )
                     .toBlocking().getIterator();
 
 
@@ -1392,10 +1634,180 @@ public class GraphManagerIT {
 
         //now delete one of the edges
         results = gm.loadEdgesToTargetByType(
-                createSearchByEdgeAndId( targetId, edge2.getType(), maxVersion, sourceId2.getType(), null ) )
+            createSearchByEdgeAndIdUnfiltered( targetId, edge2.getType(), maxVersion, sourceId2.getType(), null ) )
+                    .toBlocking().getIterator();
+
+
+        assertEquals( "Edges correct", edge2, results.next() );
+
+        assertFalse( "No more edges", results.hasNext() );
+    }
+
+
+    @Test
+    public void markDeleteSourceNode() {
+
+        final GraphManager gm = emf.createEdgeManager( scope );
+
+        Id sourceId = new SimpleId( "source" );
+        Id targetId1 = new SimpleId( "target" );
+        Id targetId2 = new SimpleId( "target2" );
+
+        Edge edge1 = createEdge( sourceId, "test", targetId1, System.currentTimeMillis() );
+
+        gm.writeEdge( edge1 ).toBlocking().singleOrDefault( null );
+
+        Edge edge2 = createEdge( sourceId, "test", targetId2, System.currentTimeMillis() );
+
+        gm.writeEdge( edge2 ).toBlocking().singleOrDefault( null );
+
+
+        final long maxVersion = System.currentTimeMillis();
+
+        Iterator<Edge> results =
+            gm.loadEdgesFromSource( createSearchByEdge( sourceId, edge1.getType(), maxVersion, null ) ).toBlocking()
+              .getIterator();
+
+
+        assertEquals( "Edge found", edge2, results.next() );
+
+        assertEquals( "Edge found", edge1, results.next() );
+
+        assertFalse( "No more edges", results.hasNext() );
+
+
+        //get our 2 edges
+        results = gm.loadEdgesFromSourceByType(
+            createSearchByEdgeAndId( sourceId, edge1.getType(), maxVersion, targetId1.getType(), null ) ).toBlocking()
+                    .getIterator();
+
+
+        assertEquals( "Edges correct", edge1, results.next() );
+
+        assertFalse( "No more edges", results.hasNext() );
+
+        //now delete one of the edges
+        results = gm.loadEdgesFromSourceByType(
+            createSearchByEdgeAndId( sourceId, edge2.getType(), maxVersion, targetId2.getType(), null ) ).toBlocking()
+                    .getIterator();
+
+
+        assertEquals( "Edges correct", edge2, results.next() );
+
+        assertFalse( "No more edges", results.hasNext() );
+
+        //mark the source node
+        gm.markNode( sourceId, edge2.getTimestamp() ).toBlocking().last();
+
+
+        //test they come back unfiltered
+
+        //read with filter marked off to ensure they're still persisted
+        results = gm.loadEdgesFromSource( createSearchByEdgeUnfiltered( sourceId, edge1.getType(), maxVersion, null ) )
+                    .toBlocking().getIterator();
+
+
+        assertEquals( "Edge found", edge2, results.next() );
+
+        assertEquals( "Edge found", edge1, results.next() );
+
+        assertFalse( "No more edges", results.hasNext() );
+
+
+        results = gm.loadEdgesFromSourceByType(
+            createSearchByEdgeAndIdUnfiltered( sourceId, edge1.getType(), maxVersion, targetId1.getType(), null ) )
+                    .toBlocking().getIterator();
+
+
+        assertEquals( "Edges correct", edge1, results.next() );
+
+        assertFalse( "No more edges", results.hasNext() );
+
+        results = gm.loadEdgesFromSourceByType(
+            createSearchByEdgeAndIdUnfiltered( sourceId, edge2.getType(), maxVersion, targetId2.getType(), null ) )
+                    .toBlocking().getIterator();
+
+
+        assertEquals( "Edges correct", edge2, results.next() );
+
+        assertFalse( "No more edges", results.hasNext() );
+
+        //now delete them
+
+
+        gm.compactNode( sourceId ).toBlocking().last();
+
+        results = gm.loadEdgesFromSource( createSearchByEdgeUnfiltered( sourceId, edge1.getType(), maxVersion, null ) )
                     .toBlocking().getIterator();
 
 
+        assertFalse( "No more edges", results.hasNext() );
+
+
+        results = gm.loadEdgesFromSourceByType(
+            createSearchByEdgeAndIdUnfiltered( sourceId, edge1.getType(), maxVersion, targetId1.getType(), null ) )
+                    .toBlocking().getIterator();
+
+
+        assertFalse( "No more edges", results.hasNext() );
+
+        results = gm.loadEdgesFromSourceByType(
+            createSearchByEdgeAndIdUnfiltered( sourceId, edge2.getType(), maxVersion, targetId2.getType(), null ) )
+                    .toBlocking().getIterator();
+
+
+        assertFalse( "No more edges", results.hasNext() );
+    }
+
+
+    @Test
+    public void markDeleteTargetNode() {
+
+        final GraphManager gm = emf.createEdgeManager( scope );
+
+        Id sourceId1 = new SimpleId( "source" );
+        Id sourceId2 = new SimpleId( "source2" );
+        Id targetId = new SimpleId( "target" );
+
+        Edge edge1 = createEdge( sourceId1, "test", targetId, System.currentTimeMillis() );
+
+        gm.writeEdge( edge1 ).toBlocking().singleOrDefault( null );
+
+        Edge edge2 = createEdge( sourceId2, "test", targetId, System.currentTimeMillis() );
+
+        gm.writeEdge( edge2 ).toBlocking().singleOrDefault( null );
+
+
+        final long maxVersion = System.currentTimeMillis();
+
+        Iterator<Edge> results =
+            gm.loadEdgesToTarget( createSearchByEdge( targetId, edge1.getType(), maxVersion, null ) ).toBlocking()
+              .getIterator();
+
+
+        assertEquals( "Edge found", edge2, results.next() );
+
+        assertEquals( "Edge found", edge1, results.next() );
+
+        assertFalse( "No more edges", results.hasNext() );
+
+
+        //get our 2 edges
+        results = gm.loadEdgesToTargetByType(
+            createSearchByEdgeAndId( targetId, edge1.getType(), maxVersion, sourceId1.getType(), null ) ).toBlocking()
+                    .getIterator();
+
+
+        assertEquals( "Edges correct", edge1, results.next() );
+
+        assertFalse( "No more edges", results.hasNext() );
+
+        //now delete one of the edges
+        results = gm.loadEdgesToTargetByType(
+            createSearchByEdgeAndId( targetId, edge2.getType(), maxVersion, sourceId2.getType(), null ) ).toBlocking()
+                    .getIterator();
+
+
         assertEquals( "Edges correct", edge2, results.next() );
 
         assertFalse( "No more edges", results.hasNext() );
@@ -1406,15 +1818,55 @@ public class GraphManagerIT {
 
         //now re-read, nothing should be there since they're marked
 
-        results = gm.loadEdgesToTarget( createSearchByEdge( targetId, edge1.getType(), maxVersion, null ) )
+
+        //now test they come back when unfiltered
+
+        results = gm.loadEdgesToTarget( createSearchByEdgeUnfiltered( targetId, edge1.getType(), maxVersion, null ) )
+                    .toBlocking().getIterator();
+
+
+        assertEquals( "Edge found", edge2, results.next() );
+
+        assertEquals( "Edge found", edge1, results.next() );
+
+        assertFalse( "No more edges", results.hasNext() );
+
+
+        //get our 2 edges
+        results = gm.loadEdgesToTargetByType(
+            createSearchByEdgeAndIdUnfiltered( targetId, edge1.getType(), maxVersion, sourceId1.getType(), null ) )
+                    .toBlocking().getIterator();
+
+
+        assertEquals( "Edges correct", edge1, results.next() );
+
+        assertFalse( "No more edges", results.hasNext() );
+
+        //now delete one of the edges
+        results = gm.loadEdgesToTargetByType(
+            createSearchByEdgeAndIdUnfiltered( targetId, edge2.getType(), maxVersion, sourceId2.getType(), null ) )
                     .toBlocking().getIterator();
 
+
+        assertEquals( "Edges correct", edge2, results.next() );
+
+        assertFalse( "No more edges", results.hasNext() );
+
+        //now compact, everything should be removed
+
+        gm.compactNode( targetId ).toBlocking().last();
+
+
+        results = gm.loadEdgesToTarget( createSearchByEdgeUnfiltered( targetId, edge1.getType(), maxVersion, null ) )
+                    .toBlocking().getIterator();
+
+
         assertFalse( "No more edges", results.hasNext() );
 
 
         //get our 2 edges
         results = gm.loadEdgesToTargetByType(
-                createSearchByEdgeAndId( targetId, edge1.getType(), maxVersion, sourceId1.getType(), null ) )
+            createSearchByEdgeAndIdUnfiltered( targetId, edge1.getType(), maxVersion, sourceId1.getType(), null ) )
                     .toBlocking().getIterator();
 
 
@@ -1422,7 +1874,7 @@ public class GraphManagerIT {
 
         //now delete one of the edges
         results = gm.loadEdgesToTargetByType(
-                createSearchByEdgeAndId( targetId, edge2.getType(), maxVersion, sourceId2.getType(), null ) )
+            createSearchByEdgeAndIdUnfiltered( targetId, edge2.getType(), maxVersion, sourceId2.getType(), null ) )
                     .toBlocking().getIterator();
 
 
@@ -1433,7 +1885,7 @@ public class GraphManagerIT {
     @Test
     public void testWriteReadEdgeTypesSourceTypesPrefix() {
 
-        final GraphManager gm =  emf.createEdgeManager( scope );
+        final GraphManager gm = emf.createEdgeManager( scope );
 
         Id sourceId = new SimpleId( "source" );
         Id targetId = new SimpleId( "target" );
@@ -1454,7 +1906,7 @@ public class GraphManagerIT {
 
         //get our 2 edge types
         Observable<String> edges =
-                gm.getEdgeTypesFromSource( new SimpleSearchEdgeType( testTargetEdge.getSourceNode(), "test1", null ) );
+            gm.getEdgeTypesFromSource( new SimpleSearchEdgeType( testTargetEdge.getSourceNode(), "test1", null ) );
 
 
         Iterator<String> results = edges.toBlocking().getIterator();
@@ -1483,7 +1935,7 @@ public class GraphManagerIT {
     public void testSourceSubTypes() {
 
         //now test sub edges
-        final GraphManager gm =  emf.createEdgeManager( scope );
+        final GraphManager gm = emf.createEdgeManager( scope );
 
         Id sourceId = new SimpleId( "source" );
         Id targetId1target1 = new SimpleId( "type1target1" );
@@ -1504,8 +1956,8 @@ public class GraphManagerIT {
         gm.writeEdge( test2TargetEdge ).toBlocking().singleOrDefault( null );
 
 
-        Observable<String> edges = gm.getIdTypesFromSource(
-                new SimpleSearchIdType( testTargetEdge.getSourceNode(), "test", "type1", null ) );
+        Observable<String> edges =
+            gm.getIdTypesFromSource( new SimpleSearchIdType( testTargetEdge.getSourceNode(), "test", "type1", null ) );
 
         Iterator<String> results = edges.toBlocking().getIterator();
 
@@ -1517,8 +1969,8 @@ public class GraphManagerIT {
         assertFalse( "No results", results.hasNext() );
 
         //now get types for test2
-        edges = gm.getIdTypesFromSource(
-                new SimpleSearchIdType( testTargetEdge.getSourceNode(), "test", "type2", null ) );
+        edges =
+            gm.getIdTypesFromSource( new SimpleSearchIdType( testTargetEdge.getSourceNode(), "test", "type2", null ) );
 
         results = edges.toBlocking().getIterator();
 
@@ -1532,7 +1984,7 @@ public class GraphManagerIT {
     @Test
     public void testWriteReadEdgeTypesTargetTypesPrefix() {
 
-        final GraphManager gm =  emf.createEdgeManager( scope );
+        final GraphManager gm = emf.createEdgeManager( scope );
 
         Id targetId = new SimpleId( "target" );
         Id sourceId = new SimpleId( "source" );
@@ -1553,7 +2005,7 @@ public class GraphManagerIT {
 
         //get our 2 edge types
         Observable<String> edges =
-                gm.getEdgeTypesToTarget( new SimpleSearchEdgeType( testTargetEdge.getTargetNode(), "test1", null ) );
+            gm.getEdgeTypesToTarget( new SimpleSearchEdgeType( testTargetEdge.getTargetNode(), "test1", null ) );
 
 
         Iterator<String> results = edges.toBlocking().getIterator();
@@ -1582,7 +2034,7 @@ public class GraphManagerIT {
     public void testTargetSubTypes() {
 
         //now test sub edges
-        final GraphManager gm =  emf.createEdgeManager( scope );
+        final GraphManager gm = emf.createEdgeManager( scope );
 
         Id targetId = new SimpleId( "target" );
         Id sourceId1target1 = new SimpleId( "type1source1" );
@@ -1603,8 +2055,8 @@ public class GraphManagerIT {
         gm.writeEdge( test2TargetEdge ).toBlocking().singleOrDefault( null );
 
 
-        Observable<String> edges = gm.getIdTypesToTarget(
-                new SimpleSearchIdType( testTargetEdge.getTargetNode(), "test", "type1", null ) );
+        Observable<String> edges =
+            gm.getIdTypesToTarget( new SimpleSearchIdType( testTargetEdge.getTargetNode(), "test", "type1", null ) );
 
         Iterator<String> results = edges.toBlocking().getIterator();
 
@@ -1616,8 +2068,8 @@ public class GraphManagerIT {
         assertFalse( "No results", results.hasNext() );
 
         //now get types for test2
-        edges = gm.getIdTypesToTarget(
-                new SimpleSearchIdType( testTargetEdge.getTargetNode(), "test", "type2", null ) );
+        edges =
+            gm.getIdTypesToTarget( new SimpleSearchIdType( testTargetEdge.getTargetNode(), "test", "type2", null ) );
 
         results = edges.toBlocking().getIterator();
 
@@ -1629,7 +2081,7 @@ public class GraphManagerIT {
 
 
     @Test( expected = NullPointerException.class )
-    public void invalidEdgeTypesWrite(  ) {
+    public void invalidEdgeTypesWrite() {
         final GraphManager em = emf.createEdgeManager( scope );
 
         em.writeEdge( null );
@@ -1637,7 +2089,7 @@ public class GraphManagerIT {
 
 
     @Test( expected = NullPointerException.class )
-    public void invalidEdgeTypesDelete( ) {
+    public void invalidEdgeTypesDelete() {
         final GraphManager em = emf.createEdgeManager( scope );
 
         em.markEdge( null );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/59ad6a98/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/test/util/EdgeTestUtils.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/test/util/EdgeTestUtils.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/test/util/EdgeTestUtils.java
index b2142e8..718fc70 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/test/util/EdgeTestUtils.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/test/util/EdgeTestUtils.java
@@ -152,6 +152,19 @@ public class EdgeTestUtils {
         return new SimpleSearchByEdgeType( sourceId, type, maxVersion, SearchByEdgeType.Order.DESCENDING, last );
     }
 
+    /**
+       *
+       * @param sourceId
+       * @param type
+       * @param maxVersion
+       * @param last
+       * @return
+       */
+      public static SearchByEdgeType createSearchByEdgeUnfiltered( final Id sourceId, final String type, final long maxVersion,
+                                                         final Edge last ) {
+          return new SimpleSearchByEdgeType( sourceId, type, maxVersion, SearchByEdgeType.Order.DESCENDING, Optional.fromNullable( last ) , false );
+      }
+
 
     /**
      *
@@ -167,6 +180,20 @@ public class EdgeTestUtils {
         return new SimpleSearchByIdType( sourceId, type, maxVersion, SearchByEdgeType.Order.DESCENDING, idType,  Optional.fromNullable(last) );
     }
 
+    /**
+      *
+      * @param sourceId
+      * @param type
+      * @param maxVersion
+      * @param idType
+      * @param last
+      * @return
+      */
+     public static SearchByIdType createSearchByEdgeAndIdUnfiltered( final Id sourceId, final String type, final long maxVersion,
+                                                           final String idType, final Edge last ) {
+         return new SimpleSearchByIdType( sourceId, type, maxVersion, SearchByEdgeType.Order.DESCENDING, idType,  Optional.fromNullable(last), false );
+     }
+
 
     /**
      *