You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/03/26 22:21:05 UTC

[21/35] Merged hystrix into asyncqueue

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/136edaba/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerTimeoutIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerTimeoutIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerTimeoutIT.java
new file mode 100644
index 0000000..509fe6a
--- /dev/null
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerTimeoutIT.java
@@ -0,0 +1,1562 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph;
+
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.jukito.All;
+import org.jukito.JukitoRunner;
+import org.jukito.UseModules;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.collection.cassandra.CassandraRule;
+import org.apache.usergrid.persistence.collection.guice.MigrationManagerRule;
+import org.apache.usergrid.persistence.graph.guice.TestGraphModule;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchIdType;
+import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import com.google.inject.Inject;
+import com.netflix.hystrix.exception.HystrixRuntimeException;
+
+import rx.Observable;
+import rx.Subscriber;
+
+import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createEdge;
+import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createId;
+import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createSearchByEdge;
+import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createSearchByEdgeAndId;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+@RunWith( JukitoRunner.class )
+@UseModules( { TestGraphModule.class } )
+//@UseModules( { TestGraphModule.class, GraphManagerIT.InvalidInput.class } )
+public class GraphManagerTimeoutIT {
+
+    /**
+     * Test timeout in millis
+     */
+    private static final long TIMEOUT = 30000;
+
+    @ClassRule
+    public static CassandraRule rule = new CassandraRule();
+
+
+    @Inject
+    @Rule
+    public MigrationManagerRule migrationManagerRule;
+
+
+    @Inject
+    protected GraphManagerFactory emf;
+
+    @Inject
+    protected GraphFig graphFig;
+
+    protected OrganizationScope scope;
+
+
+    @Before
+    public void setup() {
+        scope = mock( OrganizationScope.class );
+
+        Id orgId = mock( Id.class );
+
+        when( orgId.getType() ).thenReturn( "organization" );
+        when( orgId.getUuid() ).thenReturn( UUIDGenerator.newTimeUUID() );
+
+        when( scope.getOrganization() ).thenReturn( orgId );
+
+        if ( graphFig.getReadTimeout() > TIMEOUT ) {
+            fail( "Graph read timeout must be <= " + TIMEOUT + ".  Otherwise tests are invalid" );
+        }
+    }
+
+
+    //    @Test(timeout = TIMEOUT, expected = TimeoutException.class)
+    @Test
+    public void testWriteReadEdgeTypeSource( EdgeSerialization serialization ) throws InterruptedException {
+
+
+        final GraphManager em = emf.createGraphManager( scope );
+
+
+        final MarkedEdge edge = createEdge( "source", "edge", "target" );
+
+        //now test retrieving it
+
+        SearchByEdgeType search = createSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getVersion(), null );
+
+
+        final MockingIterator<MarkedEdge> itr = new MockingIterator<>( Collections.singletonList( edge ) );
+
+
+        when( serialization.getEdgesFromSource( scope, search ) ).thenReturn( itr );
+
+        Observable<Edge> edges = em.loadEdgesFromSource( search );
+
+        //retrieve the edge, ensure that if we block indefinitely, it times out
+
+        final AtomicInteger onNextCounter = new AtomicInteger();
+        final CountDownLatch errorLatch = new CountDownLatch( 1 );
+
+        final Throwable[] thrown = new Throwable[1];
+
+
+
+        edges.subscribe( new Subscriber<Edge>() {
+            @Override
+            public void onCompleted() {
+
+            }
+
+
+            @Override
+            public void onError( final Throwable e ) {
+                thrown[0] = e;
+                errorLatch.countDown();
+            }
+
+
+            @Override
+            public void onNext( final Edge edge ) {
+                {
+                    onNextCounter.incrementAndGet();
+                }
+            }
+        } );
+
+
+        errorLatch.await();
+
+
+        assertEquals( "One lement was produced", 1,onNextCounter.intValue() );
+        assertTrue(thrown[0] instanceof HystrixRuntimeException);
+
+    }
+
+
+    private class MockingIterator<T> implements Iterator<T> {
+
+        private final Iterator<T> items;
+
+        private final Semaphore semaphore = new Semaphore( 0 );
+
+
+        private MockingIterator( final Collection<T> items ) {
+            this.items = items.iterator();
+        }
+
+
+        @Override
+        public boolean hasNext() {
+            return true;
+        }
+
+
+        @Override
+        public T next() {
+            if ( items.hasNext() ) {
+                return items.next();
+            }
+
+            //block indefinitely
+            try {
+                semaphore.acquire();
+            }
+            catch ( InterruptedException e ) {
+                throw new RuntimeException( e );
+            }
+
+            return null;
+        }
+
+
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException( "Cannot remove" );
+        }
+    }
+
+
+    @Test
+    public void testWriteReadEdgeTypeTarget() {
+
+        GraphManager em = emf.createGraphManager( scope );
+
+
+        Edge edge = createEdge( "source", "test", "target" );
+
+        em.writeEdge( edge ).toBlockingObservable().last();
+
+        //now test retrieving it
+
+        SearchByEdgeType search = createSearchByEdge( edge.getTargetNode(), edge.getType(), edge.getVersion(), null );
+
+        Observable<Edge> edges = em.loadEdgesToTarget( search );
+
+        //implicitly blows up if more than 1 is returned from "single"
+        Edge returned = edges.toBlockingObservable().single();
+
+        assertEquals( "Correct edge returned", edge, returned );
+
+        //change edge type to be invalid, shouldn't get a result
+        search = createSearchByEdge( edge.getTargetNode(), edge.getType() + "invalid", edge.getVersion(), null );
+
+        edges = em.loadEdgesToTarget( search );
+
+        //implicitly blows up if more than 1 is returned from "single"
+        returned = edges.toBlockingObservable().singleOrDefault( null );
+
+        assertNull( "Invalid type should not be returned", returned );
+    }
+
+
+    @Test
+    public void testWriteReadEdgeTypeVersionSource() {
+
+        GraphManager em = emf.createGraphManager( scope );
+
+        final UUID earlyVersion = UUIDGenerator.newTimeUUID();
+
+
+        Edge edge = createEdge( "source", "test", "target" );
+
+        em.writeEdge( edge ).toBlockingObservable().last();
+
+        //now test retrieving it
+
+        SearchByEdgeType search = createSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getVersion(), null );
+
+        Observable<Edge> edges = em.loadEdgesFromSource( search );
+
+        //implicitly blows up if more than 1 is returned from "single"
+        Edge returned = edges.toBlockingObservable().single();
+
+        assertEquals( "Correct edge returned", edge, returned );
+
+        //now test with an earlier version, we shouldn't get the edge back
+        search = createSearchByEdge( edge.getSourceNode(), edge.getType(), earlyVersion, null );
+
+        edges = em.loadEdgesFromSource( search );
+
+        //implicitly blows up if more than 1 is returned from "single"
+        returned = edges.toBlockingObservable().singleOrDefault( null );
+
+        assertNull( "Earlier version should not be returned", returned );
+    }
+
+
+    @Test
+    public void testWriteReadEdgeTypeVersionTarget() {
+
+        GraphManager em = emf.createGraphManager( scope );
+
+
+        final UUID earlyVersion = UUIDGenerator.newTimeUUID();
+
+
+        Edge edge = createEdge( "source", "test", "target" );
+
+        em.writeEdge( edge ).toBlockingObservable().last();
+
+        //now test retrieving it
+
+        SearchByEdgeType search = createSearchByEdge( edge.getTargetNode(), edge.getType(), edge.getVersion(), null );
+
+        Observable<Edge> edges = em.loadEdgesToTarget( search );
+
+        //implicitly blows up if more than 1 is returned from "single"
+        Edge returned = edges.toBlockingObservable().single();
+
+        assertEquals( "Correct edge returned", edge, returned );
+
+        //change edge type to be invalid, shouldn't get a result
+        search = createSearchByEdge( edge.getTargetNode(), edge.getType(), earlyVersion, null );
+
+        edges = em.loadEdgesToTarget( search );
+
+        //implicitly blows up if more than 1 is returned from "single"
+        returned = edges.toBlockingObservable().singleOrDefault( null );
+
+        assertNull( "Earlier version should not be returned", returned );
+    }
+
+
+    /**
+     * Tests that if multiple versions of an edge exist, only the distinct edges with a version <= max are returned
+     */
+    @Test
+    public void testWriteReadEdgeTypeVersionSourceDistinct() {
+
+        GraphManager em = emf.createGraphManager( scope );
+
+        final UUID earlyVersion = UUIDGenerator.newTimeUUID();
+
+
+        Edge edge1 = createEdge( "source", "test", "target" );
+
+        final Id sourceId = edge1.getSourceNode();
+        final Id targetId = edge1.getTargetNode();
+
+
+        em.writeEdge( edge1 ).toBlockingObservable().last();
+
+        Edge edge2 = createEdge( sourceId, edge1.getType(), targetId );
+
+        em.writeEdge( edge2 ).toBlockingObservable().last();
+
+        Edge edge3 = createEdge( sourceId, edge1.getType(), targetId );
+
+        em.writeEdge( edge3 ).toBlockingObservable().last();
+
+
+        //now test retrieving it, we should only get edge3, since it's the latest
+
+        SearchByEdgeType search =
+                createSearchByEdge( edge1.getSourceNode(), edge1.getType(), edge3.getVersion(), null );
+
+        Observable<Edge> edges = em.loadEdgesFromSource( search );
+
+        //implicitly blows up if more than 1 is returned from "single"
+        Iterator<Edge> returned = edges.toBlockingObservable().getIterator();
+
+        assertEquals( "Correct edge returned", edge3, returned.next() );
+        assertFalse( "No more edges", returned.hasNext() );
+
+        //now test with an earlier version, we shouldn't get the edge back
+        search = createSearchByEdge( edge1.getSourceNode(), edge1.getType(), edge2.getVersion(), null );
+
+        edges = em.loadEdgesFromSource( search );
+
+        returned = edges.toBlockingObservable().getIterator();
+
+        assertEquals( "Correct edge returned", edge2, returned.next() );
+        assertFalse( "No more edges", returned.hasNext() );
+
+        search = createSearchByEdge( edge1.getSourceNode(), edge1.getType(), edge1.getVersion(), null );
+
+        edges = em.loadEdgesFromSource( search );
+
+        returned = edges.toBlockingObservable().getIterator();
+
+        assertEquals( "Correct edge returned", edge1, returned.next() );
+        assertFalse( "No more edges", returned.hasNext() );
+
+
+        search = createSearchByEdge( edge1.getSourceNode(), edge1.getType(), earlyVersion, null );
+
+        edges = em.loadEdgesFromSource( search );
+
+        returned = edges.toBlockingObservable().getIterator();
+
+        assertFalse( "No more edges", returned.hasNext() );
+    }
+
+
+    @Test
+    public void testWriteReadEdgeTypeVersionTargetDistinct() {
+
+
+        GraphManager em = emf.createGraphManager( scope );
+
+        final UUID earlyVersion = UUIDGenerator.newTimeUUID();
+
+
+        Edge edge1 = createEdge( "source", "test", "target" );
+
+        final Id sourceId = edge1.getSourceNode();
+        final Id targetId = edge1.getTargetNode();
+
+
+        em.writeEdge( edge1 ).toBlockingObservable().last();
+
+        Edge edge2 = createEdge( sourceId, edge1.getType(), targetId );
+
+        em.writeEdge( edge2 ).toBlockingObservable().last();
+
+        Edge edge3 = createEdge( sourceId, edge1.getType(), targetId );
+
+        em.writeEdge( edge3 ).toBlockingObservable().last();
+
+
+        //now test retrieving it, we should only get edge3, since it's the latest
+
+        SearchByEdgeType search =
+                createSearchByEdge( edge1.getTargetNode(), edge1.getType(), edge3.getVersion(), null );
+
+        Observable<Edge> edges = em.loadEdgesToTarget( search );
+
+        //implicitly blows up if more than 1 is returned from "single"
+        Iterator<Edge> returned = edges.toBlockingObservable().getIterator();
+
+        assertEquals( "Correct edge returned", edge3, returned.next() );
+        assertFalse( "No more edges", returned.hasNext() );
+
+        //now test with an earlier version, we shouldn't get the edge back
+        search = createSearchByEdge( edge1.getTargetNode(), edge1.getType(), edge2.getVersion(), null );
+
+        edges = em.loadEdgesToTarget( search );
+
+        returned = edges.toBlockingObservable().getIterator();
+
+        assertEquals( "Correct edge returned", edge2, returned.next() );
+        assertFalse( "No more edges", returned.hasNext() );
+
+        search = createSearchByEdge( edge1.getTargetNode(), edge1.getType(), edge1.getVersion(), null );
+
+        edges = em.loadEdgesToTarget( search );
+
+        returned = edges.toBlockingObservable().getIterator();
+
+        assertEquals( "Correct edge returned", edge1, returned.next() );
+        assertFalse( "No more edges", returned.hasNext() );
+
+
+        search = createSearchByEdge( edge1.getTargetNode(), edge1.getType(), earlyVersion, null );
+
+        edges = em.loadEdgesToTarget( search );
+
+        returned = edges.toBlockingObservable().getIterator();
+
+        assertFalse( "No more edges", returned.hasNext() );
+    }
+
+
+    @Test
+    public void testWriteReadEdgeTypePagingSource() {
+
+        GraphManager em = emf.createGraphManager( scope );
+
+        final Id sourceId = createId( "source" );
+
+
+        Edge edge1 = createEdge( sourceId, "test", createId( "target" ) );
+
+        em.writeEdge( edge1 ).toBlockingObservable().last();
+
+        Edge edge2 = createEdge( sourceId, "test", createId( "target" ) );
+
+        em.writeEdge( edge2 ).toBlockingObservable().last();
+
+        Edge edge3 = createEdge( sourceId, "test", createId( "target" ) );
+
+        em.writeEdge( edge3 ).toBlockingObservable().last();
+
+
+        //now test retrieving it
+
+        SearchByEdgeType search =
+                createSearchByEdge( edge1.getSourceNode(), edge1.getType(), edge1.getVersion(), null );
+
+        Observable<Edge> edges = em.loadEdgesFromSource( search );
+
+        //implicitly blows up if more than 1 is returned from "single"
+        Iterator<Edge> returned = edges.toBlockingObservable().getIterator();
+
+
+        //we have 3 edges, but we specified our first edge as the max, we shouldn't get any more results than the first
+        assertEquals( "Correct edge returned", edge1, returned.next() );
+
+        assertFalse( "No more edges", returned.hasNext() );
+
+        search = createSearchByEdge( edge1.getSourceNode(), edge1.getType(), edge3.getVersion(), edge2 );
+
+        edges = em.loadEdgesFromSource( search );
+
+        //implicitly blows up if more than 1 is returned from "single"
+        returned = edges.toBlockingObservable().getIterator();
+
+        assertEquals( "Paged correctly", edge3, returned.next() );
+
+        assertFalse( "End of stream", returned.hasNext() );
+    }
+
+
+    @Test
+    public void testWriteReadEdgeTypePagingTarget() {
+
+
+        GraphManager em = emf.createGraphManager( scope );
+
+
+        final Id targetId = createId( "target" );
+
+        Edge edge1 = createEdge( createId( "source" ), "test", targetId );
+
+        em.writeEdge( edge1 ).toBlockingObservable().last();
+
+        Edge edge2 = createEdge( createId( "source" ), "test", targetId );
+
+        em.writeEdge( edge2 ).toBlockingObservable().last();
+
+        Edge edge3 = createEdge( createId( "source" ), "test", targetId );
+
+        em.writeEdge( edge3 ).toBlockingObservable().last();
+
+
+        //now test retrieving it
+
+        SearchByEdgeType search =
+                createSearchByEdge( edge1.getTargetNode(), edge1.getType(), edge1.getVersion(), null );
+
+        Observable<Edge> edges = em.loadEdgesToTarget( search );
+
+        //implicitly blows up if more than 1 is returned from "single"
+        Iterator<Edge> returned = edges.toBlockingObservable().getIterator();
+
+
+        //we have 3 edges, but we specified our first edge as the max, we shouldn't get any more results than the first
+        assertEquals( "Correct edge returned", edge1, returned.next() );
+
+
+        assertFalse( "No more edges", returned.hasNext() );
+
+        search = createSearchByEdge( edge1.getTargetNode(), edge1.getType(), edge3.getVersion(), edge2 );
+
+        edges = em.loadEdgesToTarget( search );
+
+        //implicitly blows up if more than 1 is returned from "single"
+        returned = edges.toBlockingObservable().getIterator();
+
+        assertEquals( "Paged correctly", edge3, returned.next() );
+
+        assertFalse( "End of stream", returned.hasNext() );
+    }
+
+
+    @Test
+    public void testWriteReadEdgeTypeTargetTypeSource() {
+
+        GraphManager em = emf.createGraphManager( scope );
+
+
+        Edge edge = createEdge( "source", "test", "target" );
+
+        em.writeEdge( edge ).toBlockingObservable().last();
+
+        //now test retrieving it
+
+        SearchByIdType search = createSearchByEdgeAndId( edge.getSourceNode(), edge.getType(), edge.getVersion(),
+                edge.getTargetNode().getType(), null );
+
+        Observable<Edge> edges = em.loadEdgesFromSourceByType( search );
+
+        //implicitly blows up if more than 1 is returned from "single"
+        Edge returned = edges.toBlockingObservable().single();
+
+        assertEquals( "Correct edge returned", edge, returned );
+
+
+        //change edge type to be invalid, shouldn't get a result
+        search = createSearchByEdgeAndId( edge.getSourceNode(), edge.getType(), edge.getVersion(),
+                edge.getTargetNode().getType() + "invalid", null );
+
+        edges = em.loadEdgesFromSourceByType( search );
+
+        //implicitly blows up if more than 1 is returned from "single"
+        returned = edges.toBlockingObservable().singleOrDefault( null );
+
+        assertNull( "Invalid type should not be returned", returned );
+    }
+
+
+    @Test
+    public void testWriteReadEdgeTypeTargetTypeTarget() {
+
+        GraphManager em = emf.createGraphManager( scope );
+
+
+        Edge edge = createEdge( "source", "test", "target" );
+
+        em.writeEdge( edge ).toBlockingObservable().last();
+
+        //now test retrieving it
+
+        SearchByIdType search = createSearchByEdgeAndId( edge.getTargetNode(), edge.getType(), edge.getVersion(),
+                edge.getSourceNode().getType(), null );
+
+        Observable<Edge> edges = em.loadEdgesToTargetByType( search );
+
+        //implicitly blows up if more than 1 is returned from "single"
+        Edge returned = edges.toBlockingObservable().single();
+
+        assertEquals( "Correct edge returned", edge, returned );
+
+
+        //change edge type to be invalid, shouldn't get a result
+        search = createSearchByEdgeAndId( edge.getTargetNode(), edge.getType(), edge.getVersion(),
+                edge.getSourceNode().getType() + "invalid", null );
+
+        edges = em.loadEdgesToTargetByType( search );
+
+        //implicitly blows up if more than 1 is returned from "single"
+        returned = edges.toBlockingObservable().singleOrDefault( null );
+
+        assertNull( "Invalid type should not be returned", returned );
+    }
+
+
+    @Test
+    public void testWriteReadEdgeDeleteSource() {
+
+        GraphManager em = emf.createGraphManager( scope );
+
+
+        Edge edge = createEdge( "source", "test", "target" );
+
+        em.writeEdge( edge ).toBlockingObservable().last();
+
+        //now test retrieving it
+
+
+        SearchByEdgeType search = createSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getVersion(), null );
+
+        Observable<Edge> edges = em.loadEdgesFromSource( search );
+
+        //implicitly blows up if more than 1 is returned from "single"
+        Edge returned = edges.toBlockingObservable().single();
+
+        assertEquals( "Correct edge returned", edge, returned );
+
+        SearchByIdType searchById = createSearchByEdgeAndId( edge.getSourceNode(), edge.getType(), edge.getVersion(),
+                edge.getTargetNode().getType(), null );
+
+        edges = em.loadEdgesFromSourceByType( searchById );
+
+        //implicitly blows up if more than 1 is returned from "single"
+        returned = edges.toBlockingObservable().single();
+
+        assertEquals( "Correct edge returned", edge, returned );
+
+
+        //now delete it
+        em.deleteEdge( edge ).toBlockingObservable().last();
+
+        //now test retrieval, should be null
+        edges = em.loadEdgesFromSource( search );
+
+        //implicitly blows up if more than 1 is returned from "single"
+        returned = edges.toBlockingObservable().singleOrDefault( null );
+
+        assertNull( "No edge returned", returned );
+
+
+        //no search by type, should be null as well
+
+        edges = em.loadEdgesFromSourceByType( searchById );
+
+        //implicitly blows up if more than 1 is returned from "single"
+        returned = edges.toBlockingObservable().singleOrDefault( null );
+
+        assertNull( "No edge returned", returned );
+    }
+
+
+    @Test
+    public void testWriteReadEdgeDeleteTarget() {
+
+        GraphManager em = emf.createGraphManager( scope );
+
+
+        Edge edge = createEdge( "source", "test", "target" );
+
+        em.writeEdge( edge ).toBlockingObservable().last();
+
+        //now test retrieving it
+
+
+        SearchByEdgeType search = createSearchByEdge( edge.getTargetNode(), edge.getType(), edge.getVersion(), null );
+
+        Observable<Edge> edges = em.loadEdgesToTarget( search );
+
+        //implicitly blows up if more than 1 is returned from "single"
+        Edge returned = edges.toBlockingObservable().single();
+
+        assertEquals( "Correct edge returned", edge, returned );
+
+        SearchByIdType searchById = createSearchByEdgeAndId( edge.getTargetNode(), edge.getType(), edge.getVersion(),
+                edge.getSourceNode().getType(), null );
+
+        edges = em.loadEdgesToTargetByType( searchById );
+
+        //implicitly blows up if more than 1 is returned from "single"
+        returned = edges.toBlockingObservable().single();
+
+        assertEquals( "Correct edge returned", edge, returned );
+
+
+        //now delete it
+        em.deleteEdge( edge ).toBlockingObservable().last();
+
+        //now test retrieval, should be null
+        edges = em.loadEdgesToTarget( search );
+
+        //implicitly blows up if more than 1 is returned from "single"
+        returned = edges.toBlockingObservable().singleOrDefault( null );
+
+        assertNull( "No edge returned", returned );
+
+
+        //no search by type, should be null as well
+
+        edges = em.loadEdgesToTargetByType( searchById );
+
+        //implicitly blows up if more than 1 is returned from "single"
+        returned = edges.toBlockingObservable().singleOrDefault( null );
+
+        assertNull( "No edge returned", returned );
+    }
+
+
+    @Test
+    public void testWriteReadEdgeTypesSourceTypes() {
+
+        final GraphManager em = emf.createGraphManager( scope );
+
+        Id sourceId = new SimpleId( "source" );
+        Id targetId1 = new SimpleId( "target" );
+        Id targetId2 = new SimpleId( "target2" );
+
+        Edge testTargetEdge = createEdge( sourceId, "test", targetId1, UUIDGenerator.newTimeUUID() );
+
+        em.writeEdge( testTargetEdge ).toBlockingObservable().singleOrDefault( null );
+
+        Edge testTarget2Edge = createEdge( sourceId, "test", targetId2, UUIDGenerator.newTimeUUID() );
+
+        em.writeEdge( testTarget2Edge ).toBlockingObservable().singleOrDefault( null );
+
+
+        Edge test2TargetEdge = createEdge( sourceId, "test2", targetId1, UUIDGenerator.newTimeUUID() );
+
+        em.writeEdge( test2TargetEdge ).toBlockingObservable().singleOrDefault( null );
+
+
+        //get our 2 edge types
+        Observable<String> edges =
+                em.getEdgeTypesFromSource( new SimpleSearchEdgeType( testTargetEdge.getSourceNode(), null ) );
+
+
+        Iterator<String> results = edges.toBlockingObservable().getIterator();
+
+
+        assertEquals( "Edges correct", "test", results.next() );
+
+        assertEquals( "Edges correct", "test2", results.next() );
+
+        assertFalse( "No more edges", results.hasNext() );
+
+
+        //now test sub edges
+
+        edges = em.getIdTypesFromSource( new SimpleSearchIdType( testTargetEdge.getSourceNode(), "test", null ) );
+
+        results = edges.toBlockingObservable().getIterator();
+
+
+        assertEquals( "Types correct", targetId1.getType(), results.next() );
+
+        assertEquals( "Types correct", targetId2.getType(), results.next() );
+
+        assertFalse( "No results", results.hasNext() );
+
+        //now get types for test2
+        edges = em.getIdTypesFromSource( new SimpleSearchIdType( testTargetEdge.getSourceNode(), "test2", null ) );
+
+        results = edges.toBlockingObservable().getIterator();
+
+        assertEquals( "Types correct", targetId1.getType(), results.next() );
+
+        assertFalse( "No results", results.hasNext() );
+    }
+
+
+    @Test
+    public void testWriteReadEdgeTypesTargetTypes() {
+
+        final GraphManager em = emf.createGraphManager( scope );
+
+        Id sourceId1 = new SimpleId( "source" );
+        Id sourceId2 = new SimpleId( "source2" );
+        Id targetId1 = new SimpleId( "target" );
+
+
+        Edge testTargetEdge = createEdge( sourceId1, "test", targetId1, UUIDGenerator.newTimeUUID() );
+
+        em.writeEdge( testTargetEdge ).toBlockingObservable().singleOrDefault( null );
+
+        Edge testTarget2Edge = createEdge( sourceId2, "test", targetId1, UUIDGenerator.newTimeUUID() );
+
+        em.writeEdge( testTarget2Edge ).toBlockingObservable().singleOrDefault( null );
+
+
+        Edge test2TargetEdge = createEdge( sourceId1, "test2", targetId1, UUIDGenerator.newTimeUUID() );
+
+        em.writeEdge( test2TargetEdge ).toBlockingObservable().singleOrDefault( null );
+
+
+        //get our 2 edge types
+        final SearchEdgeType edgeTypes = new SimpleSearchEdgeType( testTargetEdge.getTargetNode(), null );
+
+        Observable<String> edges = em.getEdgeTypesToTarget( edgeTypes );
+
+
+        Iterator<String> results = edges.toBlockingObservable().getIterator();
+
+
+        assertEquals( "Edges correct", "test", results.next() );
+
+        assertEquals( "Edges correct", "test2", results.next() );
+
+        assertFalse( "No more edges", results.hasNext() );
+
+
+        //now test sub edges
+
+        edges = em.getIdTypesToTarget( new SimpleSearchIdType( testTargetEdge.getTargetNode(), "test", null ) );
+
+        results = edges.toBlockingObservable().getIterator();
+
+        assertEquals( "Types correct", sourceId1.getType(), results.next() );
+
+        assertEquals( "Types correct", sourceId2.getType(), results.next() );
+
+        assertFalse( "No more edges", results.hasNext() );
+
+
+        //now get types for test2
+        edges = em.getIdTypesToTarget( new SimpleSearchIdType( testTargetEdge.getTargetNode(), "test2", null ) );
+
+        results = edges.toBlockingObservable().getIterator();
+
+
+        assertEquals( "Types correct", sourceId1.getType(), results.next() );
+
+        assertFalse( "No more edges", results.hasNext() );
+    }
+
+
+    @Test
+    public void testWriteReadEdgeTypesSourceTypesPaging() {
+
+        final GraphManager em = emf.createGraphManager( scope );
+
+        Id sourceId1 = new SimpleId( "source" );
+        Id targetId1 = new SimpleId( "target" );
+        Id targetId2 = new SimpleId( "target2" );
+
+
+        Edge testTargetEdge = createEdge( sourceId1, "test", targetId1, UUIDGenerator.newTimeUUID() );
+
+        em.writeEdge( testTargetEdge ).toBlockingObservable().singleOrDefault( null );
+
+
+        Edge testTargetEdge2 = createEdge( sourceId1, "test", targetId2, UUIDGenerator.newTimeUUID() );
+
+        em.writeEdge( testTargetEdge2 ).toBlockingObservable().singleOrDefault( null );
+
+
+        Edge test2TargetEdge = createEdge( sourceId1, "test2", targetId2, UUIDGenerator.newTimeUUID() );
+
+        em.writeEdge( test2TargetEdge ).toBlockingObservable().singleOrDefault( null );
+
+
+        //get our 2 edge types
+        SearchEdgeType edgeTypes = new SimpleSearchEdgeType( testTargetEdge.getSourceNode(), null );
+
+        Observable<String> edges = em.getEdgeTypesFromSource( edgeTypes );
+
+
+        Iterator<String> results = edges.toBlockingObservable().getIterator();
+
+
+        assertEquals( "Edges correct", "test", results.next() );
+        assertEquals( "Edges correct", "test2", results.next() );
+        assertFalse( "No more edges", results.hasNext() );
+
+        //now load the next page
+
+        edgeTypes = new SimpleSearchEdgeType( testTargetEdge.getSourceNode(), "test" );
+
+        edges = em.getEdgeTypesFromSource( edgeTypes );
+
+
+        results = edges.toBlockingObservable().getIterator();
+
+
+        assertEquals( "Edges correct", "test2", results.next() );
+        assertFalse( "No more edges", results.hasNext() );
+
+
+        //now test sub edges
+
+        edges = em.getIdTypesFromSource( new SimpleSearchIdType( testTargetEdge.getSourceNode(), "test", null ) );
+
+        results = edges.toBlockingObservable().getIterator();
+
+
+        assertEquals( "Types correct", targetId1.getType(), results.next() );
+        assertEquals( "Types correct", targetId2.getType(), results.next() );
+        assertFalse( "No more edges", results.hasNext() );
+
+
+        //now get the next page
+
+        edges = em.getIdTypesFromSource(
+                new SimpleSearchIdType( testTargetEdge.getSourceNode(), "test", targetId1.getType() ) );
+
+        results = edges.toBlockingObservable().getIterator();
+
+
+        assertEquals( "Types correct", targetId2.getType(), results.next() );
+
+        assertFalse( "No more results", results.hasNext() );
+    }
+
+
+    @Test
+    public void testWriteReadEdgeTypesTargetTypesPaging() {
+
+        final GraphManager em = emf.createGraphManager( scope );
+
+        Id sourceId1 = new SimpleId( "source" );
+        Id sourceId2 = new SimpleId( "source2" );
+        Id targetId = new SimpleId( "target" );
+
+
+        Edge testTargetEdge = createEdge( sourceId1, "test", targetId, UUIDGenerator.newTimeUUID() );
+
+        em.writeEdge( testTargetEdge ).toBlockingObservable().singleOrDefault( null );
+
+
+        Edge testTargetEdge2 = createEdge( sourceId2, "test", targetId, UUIDGenerator.newTimeUUID() );
+
+        em.writeEdge( testTargetEdge2 ).toBlockingObservable().singleOrDefault( null );
+
+        Edge test2TargetEdge = createEdge( sourceId2, "test2", targetId, UUIDGenerator.newTimeUUID() );
+
+        em.writeEdge( test2TargetEdge ).toBlockingObservable().singleOrDefault( null );
+
+
+        //get our 2 edge types
+        SearchEdgeType edgeTypes = new SimpleSearchEdgeType( testTargetEdge.getTargetNode(), null );
+
+        Observable<String> edges = em.getEdgeTypesToTarget( edgeTypes );
+
+
+        Iterator<String> results = edges.toBlockingObservable().getIterator();
+
+
+        assertEquals( "Edges correct", "test", results.next() );
+        assertEquals( "Edges correct", "test2", results.next() );
+
+        assertFalse( "No more edges", results.hasNext() );
+
+
+        //now load the next page
+
+        edgeTypes = new SimpleSearchEdgeType( testTargetEdge2.getTargetNode(), "test" );
+
+        edges = em.getEdgeTypesToTarget( edgeTypes );
+
+
+        results = edges.toBlockingObservable().getIterator();
+
+
+        assertEquals( "Edges correct", "test2", results.next() );
+
+
+        assertFalse( "No more edges", results.hasNext() );
+
+        //now test sub edges
+
+        edges = em.getIdTypesToTarget( new SimpleSearchIdType( testTargetEdge.getTargetNode(), "test", null ) );
+
+        results = edges.toBlockingObservable().getIterator();
+
+
+        assertEquals( "Types correct", sourceId1.getType(), results.next() );
+
+        assertEquals( "Types correct", sourceId2.getType(), results.next() );
+
+        assertFalse( "No more edges", results.hasNext() );
+
+        //now get the next page
+
+        edges = em.getIdTypesToTarget(
+                new SimpleSearchIdType( testTargetEdge.getTargetNode(), "test", sourceId1.getType() ) );
+
+        results = edges.toBlockingObservable().getIterator();
+
+
+        assertEquals( "Types correct", sourceId2.getType(), results.next() );
+
+        assertFalse( "No more results", results.hasNext() );
+    }
+
+
+    @Test
+    public void testMarkSourceEdges() {
+
+        final GraphManager em = emf.createGraphManager( scope );
+
+        Id sourceId = new SimpleId( "source" );
+        Id targetId1 = new SimpleId( "target" );
+        Id targetId2 = new SimpleId( "target2" );
+
+        Edge edge1 = createEdge( sourceId, "test", targetId1, UUIDGenerator.newTimeUUID() );
+
+        em.writeEdge( edge1 ).toBlockingObservable().singleOrDefault( null );
+
+        Edge edge2 = createEdge( sourceId, "test", targetId2, UUIDGenerator.newTimeUUID() );
+
+        em.writeEdge( edge2 ).toBlockingObservable().singleOrDefault( null );
+
+
+        final UUID maxVersion = UUIDGenerator.newTimeUUID();
+
+
+        //get our 2 edges
+        Observable<Edge> edges = em.loadEdgesFromSource(
+                createSearchByEdge( edge1.getSourceNode(), edge1.getType(), maxVersion, null ) );
+
+
+        Iterator<Edge> results = edges.toBlockingObservable().getIterator();
+
+
+        assertEquals( "Edges correct", edge1, results.next() );
+
+        assertEquals( "Edges correct", edge2, results.next() );
+
+        assertFalse( "No more edges", results.hasNext() );
+
+        //now delete one of the edges
+
+        em.deleteEdge( edge1 ).toBlockingObservable().last();
+
+
+        edges = em.loadEdgesFromSource(
+                createSearchByEdge( edge1.getSourceNode(), edge1.getType(), maxVersion, null ) );
+
+
+        results = edges.toBlockingObservable().getIterator();
+
+
+        assertEquals( "Edges correct", edge2, results.next() );
+
+        assertFalse( "No more edges", results.hasNext() );
+
+        //now delete one of the edges
+
+        em.deleteEdge( edge2 ).toBlockingObservable().last();
+
+        edges = em.loadEdgesFromSource(
+                createSearchByEdge( edge1.getSourceNode(), edge1.getType(), maxVersion, null ) );
+
+
+        results = edges.toBlockingObservable().getIterator();
+
+
+        assertFalse( "No more edges", results.hasNext() );
+
+        //now delete one of the edges
+
+    }
+
+
+    @Test
+    public void testMarkTargetEdges() {
+
+        final GraphManager em = emf.createGraphManager( scope );
+
+        Id sourceId1 = new SimpleId( "source" );
+        Id sourceId2 = new SimpleId( "source2" );
+        Id targetId = new SimpleId( "target" );
+
+        Edge edge1 = createEdge( sourceId1, "test", targetId, UUIDGenerator.newTimeUUID() );
+
+        em.writeEdge( edge1 ).toBlockingObservable().last();
+
+        Edge edge2 = createEdge( sourceId2, "test", targetId, UUIDGenerator.newTimeUUID() );
+
+        em.writeEdge( edge2 ).toBlockingObservable().last();
+
+
+        final UUID maxVersion = UUIDGenerator.newTimeUUID();
+
+
+        //get our 2 edges
+        Observable<Edge> edges =
+                em.loadEdgesToTarget( createSearchByEdge( edge1.getTargetNode(), edge1.getType(), maxVersion, null ) );
+
+
+        Iterator<Edge> results = edges.toBlockingObservable().getIterator();
+
+
+        assertEquals( "Edges correct", edge1, results.next() );
+
+        assertEquals( "Edges correct", edge2, results.next() );
+
+        assertFalse( "No more edges", results.hasNext() );
+
+        //now delete one of the edges
+
+        em.deleteEdge( edge1 ).toBlockingObservable().last();
+
+
+        edges = em.loadEdgesToTarget( createSearchByEdge( edge1.getTargetNode(), edge1.getType(), maxVersion, null ) );
+
+
+        results = edges.toBlockingObservable().getIterator();
+
+
+        assertEquals( "Edges correct", edge2, results.next() );
+
+        assertFalse( "No more edges", results.hasNext() );
+
+        //now delete one of the edges
+
+        em.deleteEdge( edge2 ).toBlockingObservable().last();
+
+        edges = em.loadEdgesToTarget( createSearchByEdge( edge1.getTargetNode(), edge1.getType(), maxVersion, null ) );
+
+
+        results = edges.toBlockingObservable().getIterator();
+
+
+        assertFalse( "No more edges", results.hasNext() );
+
+        //now delete one of the edges
+
+    }
+
+
+    @Test
+    public void testMarkSourceEdgesType() {
+
+        final GraphManager em = emf.createGraphManager( scope );
+
+        Id sourceId = new SimpleId( "source" );
+        Id targetId1 = new SimpleId( "target" );
+        Id targetId2 = new SimpleId( "target2" );
+
+        Edge edge1 = createEdge( sourceId, "test", targetId1, UUIDGenerator.newTimeUUID() );
+
+        em.writeEdge( edge1 ).toBlockingObservable().singleOrDefault( null );
+
+        Edge edge2 = createEdge( sourceId, "test", targetId2, UUIDGenerator.newTimeUUID() );
+
+        em.writeEdge( edge2 ).toBlockingObservable().singleOrDefault( null );
+
+
+        final UUID maxVersion = UUIDGenerator.newTimeUUID();
+
+
+        //get our 2 edges
+        Observable<Edge> edges = em.loadEdgesFromSourceByType(
+                createSearchByEdgeAndId( sourceId, edge1.getType(), maxVersion, targetId1.getType(), null ) );
+
+
+        Iterator<Edge> results = edges.toBlockingObservable().getIterator();
+
+
+        assertEquals( "Edges correct", edge1, results.next() );
+
+        assertFalse( "No more edges", results.hasNext() );
+
+        //now delete one of the edges
+
+        em.deleteEdge( edge1 ).toBlockingObservable().last();
+
+
+        edges = em.loadEdgesFromSourceByType(
+                createSearchByEdgeAndId( sourceId, edge1.getType(), maxVersion, targetId1.getType(), null ) );
+
+        results = edges.toBlockingObservable().getIterator();
+
+
+        assertFalse( "No more edges", results.hasNext() );
+
+
+        edges = em.loadEdgesFromSourceByType(
+                createSearchByEdgeAndId( sourceId, edge1.getType(), maxVersion, targetId2.getType(), null ) );
+
+
+        results = edges.toBlockingObservable().getIterator();
+
+
+        assertEquals( "Edges correct", edge2, results.next() );
+
+        assertFalse( "No more edges", results.hasNext() );
+
+        //now delete one of the edges
+
+        em.deleteEdge( edge2 ).toBlockingObservable().last();
+
+
+        edges = em.loadEdgesFromSourceByType(
+                createSearchByEdgeAndId( sourceId, edge1.getType(), maxVersion, targetId2.getType(), null ) );
+
+
+        results = edges.toBlockingObservable().getIterator();
+
+
+        assertFalse( "No more edges", results.hasNext() );
+
+
+        //now delete one of the edges
+
+    }
+
+
+    @Test
+    public void testMarkTargetEdgesType() {
+
+        final GraphManager em = emf.createGraphManager( scope );
+
+        Id sourceId1 = new SimpleId( "source" );
+        Id sourceId2 = new SimpleId( "source2" );
+        Id targetId = new SimpleId( "target" );
+
+        Edge edge1 = createEdge( sourceId1, "test", targetId, UUIDGenerator.newTimeUUID() );
+
+        em.writeEdge( edge1 ).toBlockingObservable().last();
+
+        Edge edge2 = createEdge( sourceId2, "test", targetId, UUIDGenerator.newTimeUUID() );
+
+        em.writeEdge( edge2 ).toBlockingObservable().last();
+
+
+        final UUID maxVersion = UUIDGenerator.newTimeUUID();
+
+        //get our 2 edges
+        Observable<Edge> edges = em.loadEdgesToTargetByType(
+                createSearchByEdgeAndId( targetId, edge1.getType(), maxVersion, sourceId1.getType(), null ) );
+
+
+        Iterator<Edge> results = edges.toBlockingObservable().getIterator();
+
+
+        assertEquals( "Edges correct", edge1, results.next() );
+
+        assertFalse( "No more edges", results.hasNext() );
+
+        //now delete one of the edges
+
+        em.deleteEdge( edge1 ).toBlockingObservable().last();
+
+
+        edges = em.loadEdgesToTargetByType(
+                createSearchByEdgeAndId( edge1.getSourceNode(), edge1.getType(), maxVersion, sourceId1.getType(),
+                        null ) );
+
+        results = edges.toBlockingObservable().getIterator();
+
+
+        assertFalse( "No more edges", results.hasNext() );
+
+
+        edges = em.loadEdgesToTargetByType(
+                createSearchByEdgeAndId( targetId, edge1.getType(), maxVersion, sourceId2.getType(), null ) );
+
+
+        results = edges.toBlockingObservable().getIterator();
+
+
+        assertEquals( "Edges correct", edge2, results.next() );
+
+        assertFalse( "No more edges", results.hasNext() );
+
+        //now delete one of the edges
+
+        em.deleteEdge( edge2 ).toBlockingObservable().last();
+
+
+        edges = em.loadEdgesToTargetByType(
+                createSearchByEdgeAndId( targetId, edge1.getType(), maxVersion, sourceId2.getType(), null ) );
+
+
+        results = edges.toBlockingObservable().getIterator();
+
+
+        assertFalse( "No more edges", results.hasNext() );
+
+
+        //now delete one of the edges
+
+    }
+
+
+    @Test
+    public void markSourceNode() {
+
+        final GraphManager em = emf.createGraphManager( scope );
+
+        Id sourceId = new SimpleId( "source" );
+        Id targetId1 = new SimpleId( "target" );
+        Id targetId2 = new SimpleId( "target2" );
+
+        Edge edge1 = createEdge( sourceId, "test", targetId1, UUIDGenerator.newTimeUUID() );
+
+        em.writeEdge( edge1 ).toBlockingObservable().singleOrDefault( null );
+
+        Edge edge2 = createEdge( sourceId, "test", targetId2, UUIDGenerator.newTimeUUID() );
+
+        em.writeEdge( edge2 ).toBlockingObservable().singleOrDefault( null );
+
+
+        final UUID maxVersion = UUIDGenerator.newTimeUUID();
+
+        Iterator<Edge> results =
+                em.loadEdgesFromSource( createSearchByEdge( sourceId, edge1.getType(), maxVersion, null ) )
+                  .toBlockingObservable().getIterator();
+
+
+        assertEquals( "Edge found", edge1, results.next() );
+
+        assertEquals( "Edge found", edge2, results.next() );
+
+        assertFalse( "No more edges", results.hasNext() );
+
+
+        //get our 2 edges
+        results = em.loadEdgesFromSourceByType(
+                createSearchByEdgeAndId( sourceId, edge1.getType(), maxVersion, targetId1.getType(), null ) )
+                    .toBlockingObservable().getIterator();
+
+
+        assertEquals( "Edges correct", edge1, results.next() );
+
+        assertFalse( "No more edges", results.hasNext() );
+
+        //now delete one of the edges
+        results = em.loadEdgesFromSourceByType(
+                createSearchByEdgeAndId( sourceId, edge2.getType(), maxVersion, targetId2.getType(), null ) )
+                    .toBlockingObservable().getIterator();
+
+
+        assertEquals( "Edges correct", edge2, results.next() );
+
+        assertFalse( "No more edges", results.hasNext() );
+
+        //mark the source node
+        em.deleteNode( sourceId ).toBlockingObservable().last();
+
+
+        //now re-read, nothing should be there since they're marked
+
+        results = em.loadEdgesFromSource( createSearchByEdge( sourceId, edge1.getType(), maxVersion, null ) )
+                    .toBlockingObservable().getIterator();
+
+        assertFalse( "No more edges", results.hasNext() );
+
+
+        //get our 2 edges
+        results = em.loadEdgesFromSourceByType(
+                createSearchByEdgeAndId( sourceId, edge1.getType(), maxVersion, targetId1.getType(), null ) )
+                    .toBlockingObservable().getIterator();
+
+
+        assertFalse( "No more edges", results.hasNext() );
+
+        //now delete one of the edges
+        results = em.loadEdgesFromSourceByType(
+                createSearchByEdgeAndId( sourceId, edge2.getType(), maxVersion, targetId2.getType(), null ) )
+                    .toBlockingObservable().getIterator();
+
+
+        assertFalse( "No more edges", results.hasNext() );
+    }
+
+
+    @Test
+    public void markTargetNode() {
+
+        final GraphManager em = emf.createGraphManager( scope );
+
+        Id sourceId1 = new SimpleId( "source" );
+        Id sourceId2 = new SimpleId( "source2" );
+        Id targetId = new SimpleId( "target" );
+
+        Edge edge1 = createEdge( sourceId1, "test", targetId, UUIDGenerator.newTimeUUID() );
+
+        em.writeEdge( edge1 ).toBlockingObservable().singleOrDefault( null );
+
+        Edge edge2 = createEdge( sourceId2, "test", targetId, UUIDGenerator.newTimeUUID() );
+
+        em.writeEdge( edge2 ).toBlockingObservable().singleOrDefault( null );
+
+
+        final UUID maxVersion = UUIDGenerator.newTimeUUID();
+
+        Iterator<Edge> results =
+                em.loadEdgesToTarget( createSearchByEdge( targetId, edge1.getType(), maxVersion, null ) )
+                  .toBlockingObservable().getIterator();
+
+
+        assertEquals( "Edge found", edge1, results.next() );
+
+        assertEquals( "Edge found", edge2, results.next() );
+
+        assertFalse( "No more edges", results.hasNext() );
+
+
+        //get our 2 edges
+        results = em.loadEdgesToTargetByType(
+                createSearchByEdgeAndId( targetId, edge1.getType(), maxVersion, sourceId1.getType(), null ) )
+                    .toBlockingObservable().getIterator();
+
+
+        assertEquals( "Edges correct", edge1, results.next() );
+
+        assertFalse( "No more edges", results.hasNext() );
+
+        //now delete one of the edges
+        results = em.loadEdgesToTargetByType(
+                createSearchByEdgeAndId( targetId, edge2.getType(), maxVersion, sourceId2.getType(), null ) )
+                    .toBlockingObservable().getIterator();
+
+
+        assertEquals( "Edges correct", edge2, results.next() );
+
+        assertFalse( "No more edges", results.hasNext() );
+
+        //mark the source node
+        em.deleteNode( targetId ).toBlockingObservable().last();
+
+
+        //now re-read, nothing should be there since they're marked
+
+        results = em.loadEdgesToTarget( createSearchByEdge( targetId, edge1.getType(), maxVersion, null ) )
+                    .toBlockingObservable().getIterator();
+
+        assertFalse( "No more edges", results.hasNext() );
+
+
+        //get our 2 edges
+        results = em.loadEdgesToTargetByType(
+                createSearchByEdgeAndId( targetId, edge1.getType(), maxVersion, sourceId1.getType(), null ) )
+                    .toBlockingObservable().getIterator();
+
+
+        assertFalse( "No more edges", results.hasNext() );
+
+        //now delete one of the edges
+        results = em.loadEdgesToTargetByType(
+                createSearchByEdgeAndId( targetId, edge2.getType(), maxVersion, sourceId2.getType(), null ) )
+                    .toBlockingObservable().getIterator();
+
+
+        assertFalse( "No more edges", results.hasNext() );
+    }
+
+
+    @Test( expected = NullPointerException.class )
+    public void invalidEdgeTypesWrite( @All Edge edge ) {
+        final GraphManager em = emf.createGraphManager( scope );
+
+        em.writeEdge( edge );
+    }
+
+
+    @Test( expected = NullPointerException.class )
+    public void invalidEdgeTypesDelete( @All Edge edge ) {
+        final GraphManager em = emf.createGraphManager( scope );
+
+        em.deleteEdge( edge );
+    }
+
+    //
+    //    public static class InvalidInput extends JukitoModule {
+    //
+    //        @Override
+    //        protected void configureTest() {
+    //create all edge types of junk input
+    //
+    //            final UUID version = UUIDGenerator.newTimeUUID();
+    //
+    //            Id nullUuid = mock( Id.class );
+    //            when( nullUuid.getUuid() ).thenReturn( null );
+    //
+    //
+    //            Id nullType = mock( Id.class );
+    //            when( nullType.getType() ).thenReturn( "type" );
+    //
+    //            Edge[] edges = new Edge[] {
+    //                    mockEdge( nullUuid, "test", createId( "target" ), version ),
+    //
+    //                    mockEdge( nullType, "test", createId( "target" ), version ),
+    //
+    //                    mockEdge( createId( "source" ), null, createId( "target" ), version ),
+    //
+    //                    mockEdge( createId( "source" ), "test", nullUuid, version ),
+    //
+    //                    mockEdge( createId( "source" ), "test", nullType, version ),
+    //
+    //                    mockEdge( createId( "source" ), "test", createId( "target" ), null )
+    //            };
+    //
+    //
+    //            bindManyInstances( Edge.class, edges );
+    //
+    //        }
+    //
+    //
+    //        private Edge mockEdge( final Id sourceId, final String type, final Id targetId, final UUID version ) {
+    //            Edge edge = mock( Edge.class );
+    //
+    //            when( edge.getSourceNode() ).thenReturn( sourceId );
+    //            when( edge.getType() ).thenReturn( type );
+    //            when( edge.getTargetNode() ).thenReturn( targetId );
+    //            when( edge.getVersion() ).thenReturn( version );
+    //
+    //            return edge;
+    //        }
+    //    }
+}
+
+
+
+
+

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/136edaba/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java
index eaade41..747e679 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java
@@ -88,13 +88,12 @@ public class AsyncProcessorTest {
     }
 
 
-    @Test( timeout = 5000 )
+    @Test(timeout = 5000)
     public void verifyAsyncExecution() throws InterruptedException {
 
         final TestListener listener = new TestListener();
 
 
-
         final TestEvent event = new TestEvent();
 
 
@@ -121,7 +120,7 @@ public class AsyncProcessorTest {
 
         final CountDownLatch latch = new CountDownLatch( 2 );
 
-        final TestCompleteListener completeListener = new TestCompleteListener(latch);
+        final TestCompleteListener completeListener = new TestCompleteListener( latch );
 
         asyncProcessor.addCompleteListener( completeListener );
 
@@ -149,7 +148,7 @@ public class AsyncProcessorTest {
 
         final TestEvent completeEvent = completeListener.events.peek();
 
-        assertSame(event, completeEvent);
+        assertSame( event, completeEvent );
     }
 
 
@@ -252,33 +251,29 @@ public class AsyncProcessorTest {
         final TimeoutQueue queue = mock( TimeoutQueue.class );
 
 
-        when(queue.take( 1, 10000l )).thenReturn( Collections.singletonList(asynchronousMessage ));
+        when( queue.take( 1, 10000l ) ).thenReturn( Collections.singletonList( asynchronousMessage ) );
 
         AsyncProcessor<TestEvent> processor = constructProcessor( queue );
 
 
-        Collection<AsynchronousMessage<TestEvent>> timeouts =  processor.getTimeouts( 1, 10000l );
+        Collection<AsynchronousMessage<TestEvent>> timeouts = processor.getTimeouts( 1, 10000l );
 
-        assertEquals(1, timeouts.size());
+        assertEquals( 1, timeouts.size() );
 
         AsynchronousMessage<TestEvent> returned = timeouts.iterator().next();
 
-        assertSame(asynchronousMessage, returned);
-
-
-
+        assertSame( asynchronousMessage, returned );
     }
 
 
-
     /**
      * Construct the async processor
      */
     public <T> AsyncProcessorImpl<T> constructProcessor( TimeoutQueue<T> queue ) {
 
-        GraphFig fig = mock(GraphFig.class);
+        GraphFig fig = mock( GraphFig.class );
 
-        when(fig.getScanPageSize()).thenReturn( 0 );
+        when( fig.getScanPageSize() ).thenReturn( 0 );
 
         AsyncProcessorImpl<T> processor = new AsyncProcessorImpl( queue,  fig );
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/136edaba/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/LocalTimeoutQueueTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/LocalTimeoutQueueTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/LocalTimeoutQueueTest.java
index 49ae789..45d13f1 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/LocalTimeoutQueueTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/LocalTimeoutQueueTest.java
@@ -135,7 +135,7 @@ public class LocalTimeoutQueueTest {
             //validate we get a new timeout event since the old one was re-scheduled
             Iterator<AsynchronousMessage<TestEvent>> eventIterator = results.iterator();
 
-            while(eventIterator.hasNext()){
+            while ( eventIterator.hasNext() ) {
 
                 AsynchronousMessage<TestEvent> message = eventIterator.next();
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/136edaba/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/guice/TestGraphModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/guice/TestGraphModule.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/guice/TestGraphModule.java
index aab90f5..9e63269 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/guice/TestGraphModule.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/guice/TestGraphModule.java
@@ -18,8 +18,10 @@
  */
 package org.apache.usergrid.persistence.graph.guice;
 
+
 import org.apache.usergrid.persistence.collection.guice.TestModule;
 
+
 /**
  * Wrapper for configuring our guice test env
  */
@@ -27,6 +29,6 @@ public class TestGraphModule extends TestModule {
 
     @Override
     protected void configure() {
-        install(new GraphModule());
+        install( new GraphModule() );
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/136edaba/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
index dcddb27..e735aea 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
@@ -16,8 +16,9 @@ import org.apache.usergrid.persistence.collection.OrganizationScope;
 import org.apache.usergrid.persistence.collection.cassandra.CassandraRule;
 import org.apache.usergrid.persistence.collection.guice.MigrationManagerRule;
 import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.graph.EdgeManager;
-import org.apache.usergrid.persistence.graph.EdgeManagerFactory;
+import org.apache.usergrid.persistence.graph.GraphFig;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
 import org.apache.usergrid.persistence.graph.MarkedEdge;
 import org.apache.usergrid.persistence.graph.guice.TestGraphModule;
 import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
@@ -30,12 +31,13 @@ import com.google.inject.Inject;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
 import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createEdge;
+import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createGetByEdge;
+import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createId;
 import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createSearchByEdge;
 import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createSearchEdge;
 import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createSearchIdType;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -70,7 +72,10 @@ public class NodeDeleteListenerTest {
     protected NodeSerialization nodeSerialization;
 
     @Inject
-    protected EdgeManagerFactory emf;
+    protected GraphManagerFactory emf;
+
+    @Inject
+    protected GraphFig graphFig;
 
 
     protected OrganizationScope scope;
@@ -96,7 +101,7 @@ public class NodeDeleteListenerTest {
     @Test
     public void testNoDeletionMarked() {
 
-        EdgeManager em = emf.createEdgeManager( scope );
+        GraphManager em = emf.createEdgeManager( scope );
 
         Edge edge = createEdge( "source", "test", "target" );
 
@@ -134,7 +139,7 @@ public class NodeDeleteListenerTest {
     @Test
     public void testRemoveSourceNode() throws ConnectionException {
 
-        EdgeManager em = emf.createEdgeManager( scope );
+        GraphManager em = emf.createEdgeManager( scope );
 
         Edge edge = createEdge( "source", "test", "target" );
 
@@ -159,7 +164,7 @@ public class NodeDeleteListenerTest {
 
         int count = deleteListener.receive( deleteEvent ).toBlockingObservable().last();
 
-        assertEquals( 1, count);
+        assertEquals( 1, count );
 
         //now verify we can't get any of the info back
 
@@ -179,6 +184,12 @@ public class NodeDeleteListenerTest {
 
         assertFalse( "No target should be returned", returned.hasNext() );
 
+
+        returned = edgeSerialization
+                .getEdgeVersions( scope, createGetByEdge( sourceNode, edge.getType(), targetNode, now, null ) );
+
+        assertFalse( "No version should be returned", returned.hasNext() );
+
         //no types from source
 
         Iterator<String> types =
@@ -214,7 +225,190 @@ public class NodeDeleteListenerTest {
      * since it has no other targets
      */
     @Test
-    public void testRemoveTargetNode() {
+    public void testRemoveTargetNode() throws ConnectionException {
+
+        GraphManager em = emf.createEdgeManager( scope );
+
+        Edge edge = createEdge( "source", "test", "target" );
+
+        //write the edge
+        Edge last = em.writeEdge( edge ).toBlockingObservable().last();
+
+
+        assertEquals( edge, last );
+
+        Id sourceNode = edge.getSourceNode();
+
+        Id targetNode = edge.getTargetNode();
+
+
+        //mark the node so
+        UUID deleteVersion = UUIDGenerator.newTimeUUID();
+
+        nodeSerialization.mark( scope, targetNode, deleteVersion ).execute();
+
+        EdgeEvent<Id> deleteEvent = new EdgeEvent<Id>( scope, deleteVersion, targetNode );
+
+
+        int count = deleteListener.receive( deleteEvent ).toBlockingObservable().last();
+
+        assertEquals( 1, count );
+
+        //now verify we can't get any of the info back
+
+        UUID now = UUIDGenerator.newTimeUUID();
+
+
+        Iterator<MarkedEdge> returned = edgeSerialization
+                .getEdgesFromSource( scope, createSearchByEdge( sourceNode, edge.getType(), now, null ) );
+
+        //no edge from source node should be returned
+        assertFalse( "No source should be returned", returned.hasNext() );
+
+        //validate it's not returned by the
+
+        returned = edgeSerialization
+                .getEdgesToTarget( scope, createSearchByEdge( targetNode, edge.getType(), now, null ) );
+
+        assertFalse( "No target should be returned", returned.hasNext() );
+
+
+        returned = edgeSerialization
+                .getEdgeVersions( scope, createGetByEdge( sourceNode, edge.getType(), targetNode, now, null ) );
+
+        assertFalse( "No version should be returned", returned.hasNext() );
+
+        //no types from source
+
+        Iterator<String> types =
+                edgeMetadataSerialization.getEdgeTypesFromSource( scope, createSearchEdge( sourceNode, null ) );
+
+        assertFalse( types.hasNext() );
+
+
+        //no types to target
+
+        types = edgeMetadataSerialization.getEdgeTypesToTarget( scope, createSearchEdge( targetNode, null ) );
+
+        assertFalse( types.hasNext() );
+
+
+        //no target types from source
+
+        Iterator<String> idTypes = edgeMetadataSerialization
+                .getIdTypesFromSource( scope, createSearchIdType( sourceNode, edge.getType(), null ) );
+
+        assertFalse( idTypes.hasNext() );
+
+        //no source types to target
+        idTypes = edgeMetadataSerialization
+                .getIdTypesToTarget( scope, createSearchIdType( targetNode, edge.getType(), null ) );
+
+        assertFalse( idTypes.hasNext() );
+    }
+
+
+    /**
+     * Simple test case that tests a single edge and removing the node.  The other target node should be removed as
+     * well since it has no other targets
+     */
+    @Test
+    public void testMultiDelete() throws ConnectionException {
+
+        GraphManager em = emf.createEdgeManager( scope );
+
+
+        //create loads of edges to easily delete.  We'll keep all the types of "test"
+        final int edgeCount = graphFig.getScanPageSize() * 4;
+        Id toDelete = createId( "toDelete" );
+        final String edgeType = "test";
+
+        int countSaved = 0;
+
+
+        for ( int i = 0; i < edgeCount; i++ ) {
+            Edge edge ;
+
+            //mix up source vs target, good for testing as well as create a lot of sub types to ensure they're removed
+            if ( i % 2 == 0 ) {
+                edge = createEdge( toDelete, edgeType, createId( "target"+Math.random() ) );
+            }
+            else {
+                edge = createEdge( createId( "source"+Math.random() ), edgeType, toDelete );
+            }
+
+            //write the edge
+            Edge last = em.writeEdge( edge ).toBlockingObservable().last();
+
+
+            assertEquals( edge, last );
+
+            countSaved++;
+        }
+
+        assertEquals(edgeCount, countSaved);
+
+
+        //mark the node so
+//        UUID deleteVersion = UUIDGenerator.newTimeUUID();
+
+        UUID deleteVersion = UUID.fromString( "ffffffff-ffff-1fff-bfff-ffffffffffff" );
+
+        nodeSerialization.mark( scope, toDelete, deleteVersion ).execute();
+
+        EdgeEvent<Id> deleteEvent = new EdgeEvent<Id>( scope, deleteVersion, toDelete );
+
+
+        int count = deleteListener.receive( deleteEvent ).toBlockingObservable().last();
+
+        assertEquals( edgeCount, count );
+
+        //now verify we can't get any of the info back
+
+        UUID now = UUIDGenerator.newTimeUUID();
 
+
+        Iterator<MarkedEdge> returned = edgeSerialization
+                .getEdgesFromSource( scope, createSearchByEdge( toDelete, edgeType, now, null ) );
+
+        //no edge from source node should be returned
+        assertFalse( "No source should be returned", returned.hasNext() );
+
+        //validate it's not returned by the
+
+        returned = edgeSerialization
+                .getEdgesToTarget( scope, createSearchByEdge( toDelete, edgeType, now, null ) );
+
+        assertFalse( "No target should be returned", returned.hasNext() );
+
+
+
+        //no types from source
+
+        Iterator<String> types =
+                edgeMetadataSerialization.getEdgeTypesFromSource( scope, createSearchEdge( toDelete, null ) );
+
+        assertFalse( types.hasNext() );
+
+
+        //no types to target
+
+        types = edgeMetadataSerialization.getEdgeTypesToTarget( scope, createSearchEdge( toDelete, null ) );
+
+        assertFalse( types.hasNext() );
+
+
+        //no target types from source
+
+        Iterator<String> idTypes = edgeMetadataSerialization
+                .getIdTypesFromSource( scope, createSearchIdType( toDelete, edgeType, null ) );
+
+        assertFalse( idTypes.hasNext() );
+
+        //no source types to target
+        idTypes = edgeMetadataSerialization
+                .getIdTypesToTarget( scope, createSearchIdType( toDelete, edgeType, null ) );
+
+        assertFalse( idTypes.hasNext() );
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/136edaba/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairTest.java
index dc8a3ac..7dce7da 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairTest.java
@@ -65,8 +65,8 @@ import static org.mockito.Mockito.when;
  *
  *
  */
-@RunWith( JukitoRunner.class )
-@UseModules( { TestGraphModule.class } )
+@RunWith(JukitoRunner.class)
+@UseModules({ TestGraphModule.class })
 public class EdgeDeleteRepairTest {
 
     private static final Logger LOG = LoggerFactory.getLogger( EdgeDeleteRepairTest.class );
@@ -174,7 +174,7 @@ public class EdgeDeleteRepairTest {
 
 
         //now verify we get all the versions we expect back
-        Iterator<MarkedEdge> iterator = edgeSerialization.getEdgeFromSource( scope,
+        Iterator<MarkedEdge> iterator = edgeSerialization.getEdgeVersions( scope,
                 new SimpleSearchByEdge( sourceId, edgeType, targetId, UUIDGenerator.newTimeUUID(), null ) );
 
         int count = 0;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/136edaba/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairTest.java
index e8835c3..cb8ea67 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairTest.java
@@ -43,7 +43,6 @@ import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchIdType;
 import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
 import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
-import org.apache.usergrid.persistence.graph.serialization.NodeSerialization;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 
@@ -62,8 +61,8 @@ import static org.mockito.Mockito.when;
  *
  *
  */
-@RunWith( JukitoRunner.class )
-@UseModules( { TestGraphModule.class } )
+@RunWith(JukitoRunner.class)
+@UseModules({ TestGraphModule.class })
 public class EdgeMetaRepairTest {
 
 
@@ -127,7 +126,7 @@ public class EdgeMetaRepairTest {
         edgeMetadataSerialization.writeEdge( scope, edge ).execute();
 
         int value = edgeMetaRepair.repairTargets( scope, edge.getTargetNode(), edge.getType(), edge.getVersion() )
-                             .toBlockingObservable().single();
+                                  .toBlockingObservable().single();
 
         assertEquals( "No subtypes removed, edge exists", 1, value );
 
@@ -136,7 +135,7 @@ public class EdgeMetaRepairTest {
         edgeSerialization.deleteEdge( scope, edge ).execute();
 
         value = edgeMetaRepair.repairTargets( scope, edge.getTargetNode(), edge.getType(), edge.getVersion() )
-                         .toBlockingObservable().single();
+                              .toBlockingObservable().single();
 
         assertEquals( "Single subtype should be removed", 0, value );
 
@@ -160,21 +159,20 @@ public class EdgeMetaRepairTest {
 
         Id targetId = createId( "target" );
 
-        Edge edge1 = createEdge( createId("source1"), "test", targetId);
-
+        Edge edge1 = createEdge( createId( "source1" ), "test", targetId );
 
 
         edgeSerialization.writeEdge( scope, edge1 ).execute();
 
         edgeMetadataSerialization.writeEdge( scope, edge1 ).execute();
 
-        Edge edge2 = createEdge( createId("source2"), "test", targetId );
+        Edge edge2 = createEdge( createId( "source2" ), "test", targetId );
 
         edgeSerialization.writeEdge( scope, edge2 ).execute();
 
         edgeMetadataSerialization.writeEdge( scope, edge2 ).execute();
 
-        Edge edge3 = createEdge( createId("source3"), "test", targetId );
+        Edge edge3 = createEdge( createId( "source3" ), "test", targetId );
 
         edgeSerialization.writeEdge( scope, edge3 ).execute();
 
@@ -184,7 +182,7 @@ public class EdgeMetaRepairTest {
         UUID cleanupVersion = UUIDGenerator.newTimeUUID();
 
         int value = edgeMetaRepair.repairTargets( scope, edge1.getTargetNode(), edge1.getType(), cleanupVersion )
-                             .toBlockingObservable().single();
+                                  .toBlockingObservable().single();
 
         assertEquals( "No subtypes removed, edges exist", 3, value );
 
@@ -193,21 +191,21 @@ public class EdgeMetaRepairTest {
         edgeSerialization.deleteEdge( scope, edge1 ).execute();
 
         value = edgeMetaRepair.repairTargets( scope, edge1.getTargetNode(), edge1.getType(), cleanupVersion )
-                         .toBlockingObservable().single();
+                              .toBlockingObservable().single();
 
         assertEquals( "No subtypes removed, edges exist", 2, value );
 
         edgeSerialization.deleteEdge( scope, edge2 ).execute();
 
         value = edgeMetaRepair.repairTargets( scope, edge1.getTargetNode(), edge1.getType(), cleanupVersion )
-                         .toBlockingObservable().single();
+                              .toBlockingObservable().single();
 
         assertEquals( "No subtypes removed, edges exist", 1, value );
 
         edgeSerialization.deleteEdge( scope, edge3 ).execute();
 
         value = edgeMetaRepair.repairTargets( scope, edge1.getTargetNode(), edge1.getType(), cleanupVersion )
-                         .toBlockingObservable().single();
+                              .toBlockingObservable().single();
 
 
         assertEquals( "Single subtype should be removed", 0, value );
@@ -233,13 +231,13 @@ public class EdgeMetaRepairTest {
         final Id targetId = createId( "target" );
         final String edgeType = "test";
 
-        final int size =  graphFig.getRepairConcurrentSize()*2;
+        final int size = graphFig.getRepairConcurrentSize() * 2;
 
         Set<Edge> writtenEdges = new HashSet<Edge>();
 
 
-        for(int i = 0; i < size; i ++){
-            Edge edge = createEdge( createId("source"+i), edgeType, targetId);
+        for ( int i = 0; i < size; i++ ) {
+            Edge edge = createEdge( createId( "source" + i ), edgeType, targetId );
 
             edgeSerialization.writeEdge( scope, edge ).execute();
 
@@ -251,27 +249,26 @@ public class EdgeMetaRepairTest {
 
         UUID cleanupVersion = UUIDGenerator.newTimeUUID();
 
-        int value = edgeMetaRepair.repairTargets( scope, targetId, edgeType, cleanupVersion )
-                             .toBlockingObservable().single();
+        int value = edgeMetaRepair.repairTargets( scope, targetId, edgeType, cleanupVersion ).toBlockingObservable()
+                                  .single();
 
         assertEquals( "No subtypes removed, edges exist", size, value );
 
         //now delete the edge
 
-        for(Edge created: writtenEdges){
+        for ( Edge created : writtenEdges ) {
             edgeSerialization.deleteEdge( scope, created ).execute();
         }
 
 
-        value = edgeMetaRepair.repairTargets( scope, targetId, edgeType, cleanupVersion )
-                                     .toBlockingObservable().last();
+        value = edgeMetaRepair.repairTargets( scope, targetId, edgeType, cleanupVersion ).toBlockingObservable().last();
 
         assertEquals( "Subtypes removed", 0, value );
 
         //now verify they're gone
 
-        Iterator<String> edgeTypes = edgeMetadataSerialization
-                .getEdgeTypesToTarget( scope, new SimpleSearchEdgeType( targetId, null ) );
+        Iterator<String> edgeTypes =
+                edgeMetadataSerialization.getEdgeTypesToTarget( scope, new SimpleSearchEdgeType( targetId, null ) );
 
         assertFalse( "No edge types exist", edgeTypes.hasNext() );
 
@@ -283,7 +280,6 @@ public class EdgeMetaRepairTest {
     }
 
 
-
     @Test
     public void cleanSourceSingleEdge() throws ConnectionException {
         Edge edge = createEdge( "source", "test", "target" );
@@ -293,7 +289,7 @@ public class EdgeMetaRepairTest {
         edgeMetadataSerialization.writeEdge( scope, edge ).execute();
 
         int value = edgeMetaRepair.repairSources( scope, edge.getSourceNode(), edge.getType(), edge.getVersion() )
-                             .toBlockingObservable().single();
+                                  .toBlockingObservable().single();
 
         assertEquals( "No subtypes removed, edge exists", 1, value );
 
@@ -302,7 +298,7 @@ public class EdgeMetaRepairTest {
         edgeSerialization.deleteEdge( scope, edge ).execute();
 
         value = edgeMetaRepair.repairSources( scope, edge.getSourceNode(), edge.getType(), edge.getVersion() )
-                         .toBlockingObservable().single();
+                              .toBlockingObservable().single();
 
         assertEquals( "Single subtype should be removed", 0, value );
 
@@ -326,21 +322,20 @@ public class EdgeMetaRepairTest {
 
         Id sourceId = createId( "source" );
 
-        Edge edge1 = createEdge( sourceId, "test", createId("target1"));
-
+        Edge edge1 = createEdge( sourceId, "test", createId( "target1" ) );
 
 
         edgeSerialization.writeEdge( scope, edge1 ).execute();
 
         edgeMetadataSerialization.writeEdge( scope, edge1 ).execute();
 
-        Edge edge2 = createEdge( sourceId, "test", createId("target2") );
+        Edge edge2 = createEdge( sourceId, "test", createId( "target2" ) );
 
         edgeSerialization.writeEdge( scope, edge2 ).execute();
 
         edgeMetadataSerialization.writeEdge( scope, edge2 ).execute();
 
-        Edge edge3 = createEdge( sourceId, "test", createId("target3") );
+        Edge edge3 = createEdge( sourceId, "test", createId( "target3" ) );
 
         edgeSerialization.writeEdge( scope, edge3 ).execute();
 
@@ -350,7 +345,7 @@ public class EdgeMetaRepairTest {
         UUID cleanupVersion = UUIDGenerator.newTimeUUID();
 
         int value = edgeMetaRepair.repairSources( scope, edge1.getSourceNode(), edge1.getType(), cleanupVersion )
-                             .toBlockingObservable().single();
+                                  .toBlockingObservable().single();
 
         assertEquals( "No subtypes removed, edges exist", 3, value );
 
@@ -359,21 +354,21 @@ public class EdgeMetaRepairTest {
         edgeSerialization.deleteEdge( scope, edge1 ).execute();
 
         value = edgeMetaRepair.repairSources( scope, edge1.getSourceNode(), edge1.getType(), cleanupVersion )
-                         .toBlockingObservable().single();
+                              .toBlockingObservable().single();
 
         assertEquals( "No subtypes removed, edges exist", 2, value );
 
         edgeSerialization.deleteEdge( scope, edge2 ).execute();
 
         value = edgeMetaRepair.repairSources( scope, edge1.getSourceNode(), edge1.getType(), cleanupVersion )
-                         .toBlockingObservable().single();
+                              .toBlockingObservable().single();
 
         assertEquals( "No subtypes removed, edges exist", 1, value );
 
         edgeSerialization.deleteEdge( scope, edge3 ).execute();
 
         value = edgeMetaRepair.repairSources( scope, edge1.getSourceNode(), edge1.getType(), cleanupVersion )
-                         .toBlockingObservable().single();
+                              .toBlockingObservable().single();
 
 
         assertEquals( "Single subtype should be removed", 0, value );
@@ -400,13 +395,13 @@ public class EdgeMetaRepairTest {
 
         final String edgeType = "test";
 
-        final int size =  graphFig.getRepairConcurrentSize()*2;
+        final int size = graphFig.getRepairConcurrentSize() * 2;
 
         Set<Edge> writtenEdges = new HashSet<Edge>();
 
 
-        for(int i = 0; i < size; i ++){
-            Edge edge = createEdge( sourceId, edgeType, createId("target"+i));
+        for ( int i = 0; i < size; i++ ) {
+            Edge edge = createEdge( sourceId, edgeType, createId( "target" + i ) );
 
             edgeSerialization.writeEdge( scope, edge ).execute();
 
@@ -418,27 +413,27 @@ public class EdgeMetaRepairTest {
 
         UUID cleanupVersion = UUIDGenerator.newTimeUUID();
 
-        int value = edgeMetaRepair.repairSources( scope, sourceId, edgeType, cleanupVersion )
-                             .toBlockingObservable().single();
+        int value = edgeMetaRepair.repairSources( scope, sourceId, edgeType, cleanupVersion ).toBlockingObservable()
+                                  .single();
 
         assertEquals( "No subtypes removed, edges exist", size, value );
 
         //now delete the edge
 
-        for(Edge created: writtenEdges){
+        for ( Edge created : writtenEdges ) {
             edgeSerialization.deleteEdge( scope, created ).execute();
         }
 
 
-        value = edgeMetaRepair.repairSources( scope, sourceId, edgeType, cleanupVersion )
-                                     .toBlockingObservable().single();
+        value = edgeMetaRepair.repairSources( scope, sourceId, edgeType, cleanupVersion ).toBlockingObservable()
+                              .single();
 
         assertEquals( "Subtypes removed", 0, value );
 
         //now verify they're gone
 
-        Iterator<String> edgeTypes = edgeMetadataSerialization
-                .getEdgeTypesFromSource( scope, new SimpleSearchEdgeType( sourceId, null ) );
+        Iterator<String> edgeTypes =
+                edgeMetadataSerialization.getEdgeTypesFromSource( scope, new SimpleSearchEdgeType( sourceId, null ) );
 
         assertFalse( "No edge types exist", edgeTypes.hasNext() );
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/136edaba/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairTest.java
index 3416421..c7c95a1 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairTest.java
@@ -24,11 +24,7 @@ import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.jukito.JukitoRunner;
 import org.jukito.UseModules;
@@ -69,8 +65,8 @@ import static org.mockito.Mockito.when;
  *
  *
  */
-@RunWith( JukitoRunner.class )
-@UseModules( { TestGraphModule.class } )
+@RunWith(JukitoRunner.class)
+@UseModules({ TestGraphModule.class })
 public class EdgeWriteRepairTest {
 
     private static final Logger LOG = LoggerFactory.getLogger( EdgeWriteRepairTest.class );
@@ -120,11 +116,9 @@ public class EdgeWriteRepairTest {
     }
 
 
-
     /**
-     * Test repairing with no edges
-     * TODO: TN.  There appears to be a race condition here with ordering.  Not sure if this is intentional as part of the impl
-     * or if it's an issue
+     * Test repairing with no edges TODO: TN.  There appears to be a race condition here with ordering.  Not sure if
+     * this is intentional as part of the impl or if it's an issue
      */
     @Test
     public void versionTest() throws ConnectionException {
@@ -149,11 +143,9 @@ public class EdgeWriteRepairTest {
 
             LOG.info( "Writing edge at index [{}] {}", i, edge );
 
-            if(i < deleteIndex){
+            if ( i < deleteIndex ) {
                 deletedEdges.add( edge );
             }
-
-
         }
 
 
@@ -165,38 +157,37 @@ public class EdgeWriteRepairTest {
 
         for ( MarkedEdge edge : edges ) {
 
-            LOG.info("Returned edge {} for repair", edge);
+            LOG.info( "Returned edge {} for repair", edge );
 
-           final boolean shouldBeDeleted = deletedEdges.contains( edge );
+            final boolean shouldBeDeleted = deletedEdges.contains( edge );
 
             assertTrue( "Removed matches saved index", shouldBeDeleted );
 
             deletedStream.add( edge );
-
         }
 
         deletedEdges.removeAll( deletedStream.elementSet() );
 
-        assertEquals(0, deletedEdges.size());
+        assertEquals( 0, deletedEdges.size() );
 
         //now verify we get all the versions we expect back
-        Iterator<MarkedEdge> iterator = edgeSerialization.getEdgeFromSource( scope,
+        Iterator<MarkedEdge> iterator = edgeSerialization.getEdgeVersions( scope,
                 new SimpleSearchByEdge( sourceId, edgeType, targetId, UUIDGenerator.newTimeUUID(), null ) );
 
         int count = 0;
 
-        for(MarkedEdge edge: new IterableWrapper<MarkedEdge>( iterator )){
+        for ( MarkedEdge edge : new IterableWrapper<MarkedEdge>( iterator ) ) {
 
-            final Edge saved = versions.get( size - count -1 );
+            final Edge saved = versions.get( size - count - 1 );
 
-            assertEquals(saved, edge);
+            assertEquals( saved, edge );
 
             count++;
         }
 
-        final int keptCount = size-deleteIndex;
+        final int keptCount = size - deleteIndex;
 
-        assertEquals("Kept edge version was the minimum", keptCount, count);
+        assertEquals( "Kept edge version was the minimum", keptCount, count );
     }
 
 
@@ -214,7 +205,4 @@ public class EdgeWriteRepairTest {
             return this.sourceIterator;
         }
     }
-
-
-
 }