You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/03/27 22:21:28 UTC

[14/50] [abbrv] Merged hystrix into asyncqueue

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/136edaba/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeSerialization.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeSerialization.java
index 548d821..9344e70 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeSerialization.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeSerialization.java
@@ -65,12 +65,12 @@ public interface EdgeSerialization {
 
 
     /**
-     * Search for specific versions of the edge from source->target. Will return all versions.
+     * Search for all versions of this edge < the search version.  Returns all versions
      * @param scope
      * @param search
      * @return
      */
-    Iterator<MarkedEdge> getEdgeFromSource( OrganizationScope scope, SearchByEdge search );
+    Iterator<MarkedEdge> getEdgeVersions( OrganizationScope scope, SearchByEdge search );
 
     /**
      * Get an iterator of all edges by edge type originating from source node
@@ -97,14 +97,6 @@ public interface EdgeSerialization {
      */
     Iterator<MarkedEdge> getEdgesToTarget( OrganizationScope scope, SearchByEdgeType edgeType );
 
-    /**
-     * Search for specific versions of the edge from source->target. Will return all versions
-     * @param scope
-     * @param search
-     * @return
-     */
-    Iterator<MarkedEdge> getEdgeToTarget( OrganizationScope scope, SearchByEdge search );
-
 
     /**
      * Get an iterator of all edges by edge type pointing to the target node.  Also uses the source id type to limit the

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/136edaba/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
index 7e11206..d255929 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
@@ -85,6 +85,8 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
     //row key with target id type
     private static final RowTypeSerializer ROW_TYPE_SERIALIZER = new RowTypeSerializer();
 
+    private static final EdgeRowKeySerializer EDGE_ROW_KEY_SERIALIZER = new EdgeRowKeySerializer();
+
     //Edge serializers
     private static final EdgeSerializer EDGE_SERIALIZER = new EdgeSerializer();
 
@@ -124,6 +126,15 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
                     new OrganizationScopedRowKeySerializer<RowKeyType>( ROW_TYPE_SERIALIZER ), EDGE_SERIALIZER );
 
 
+    /**
+     * Get all graph edge versions
+     */
+    private static final MultiTennantColumnFamily<OrganizationScope, EdgeRowKey, UUID> GRAPH_EDGE_VERSIONS =
+            new MultiTennantColumnFamily<OrganizationScope, EdgeRowKey, UUID>( "Graph_Edge_Versions",
+                    new OrganizationScopedRowKeySerializer<EdgeRowKey>( EDGE_ROW_KEY_SERIALIZER ),
+                    UUIDSerializer.get() );
+
+
     protected final Keyspace keyspace;
     protected final CassandraConfig cassandraConfig;
     protected final GraphFig graphFig;
@@ -140,16 +151,23 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
 
     @Override
     public MutationBatch writeEdge( final OrganizationScope scope, final Edge edge ) {
-        final MutationBatch batch = keyspace.prepareMutationBatch().withConsistencyLevel( cassandraConfig.getWriteCL() );;
+        final MutationBatch batch = keyspace.prepareMutationBatch().withConsistencyLevel( cassandraConfig.getWriteCL());
 
 
-        doWrite( scope, edge, new RowOp() {
+        doWrite( scope, edge, new RowOp<RowKey>() {
             @Override
-            public void doWrite( final MultiTennantColumnFamily columnFamily, final Object rowKey,
-                                 final DirectedEdge edge ) {
-
+            public <R extends RowKey> void doWrite(
+                    final MultiTennantColumnFamily<OrganizationScope, R, DirectedEdge> columnFamily, final R rowKey,
+                    final DirectedEdge edge ) {
                 batch.withRow( columnFamily, ScopedRowKey.fromKey( scope, rowKey ) ).putColumn( edge, false );
             }
+
+
+            @Override
+            public void doWrite( final MultiTennantColumnFamily<OrganizationScope, EdgeRowKey, UUID> columnFamily,
+                                 final EdgeRowKey rowKey, final UUID version ) {
+                batch.withRow( columnFamily, ScopedRowKey.fromKey( scope, rowKey ) ).putColumn( version, false );
+            }
         } );
 
 
@@ -159,16 +177,23 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
 
     @Override
     public MutationBatch markEdge( final OrganizationScope scope, final Edge edge ) {
-        final MutationBatch batch = keyspace.prepareMutationBatch().withConsistencyLevel( cassandraConfig.getWriteCL() );;
+        final MutationBatch batch = keyspace.prepareMutationBatch().withConsistencyLevel( cassandraConfig.getWriteCL() );
 
 
-        doWrite( scope, edge, new RowOp() {
+        doWrite( scope, edge, new RowOp<RowKey>() {
             @Override
-            public void doWrite( final MultiTennantColumnFamily columnFamily, final Object rowKey,
-                                 final DirectedEdge edge ) {
-
+            public <R extends RowKey> void doWrite(
+                    final MultiTennantColumnFamily<OrganizationScope, R, DirectedEdge> columnFamily, final R rowKey,
+                    final DirectedEdge edge ) {
                 batch.withRow( columnFamily, ScopedRowKey.fromKey( scope, rowKey ) ).putColumn( edge, true );
             }
+
+
+            @Override
+            public void doWrite( final MultiTennantColumnFamily<OrganizationScope, EdgeRowKey, UUID> columnFamily,
+                                 final EdgeRowKey rowKey, final UUID version ) {
+                batch.withRow( columnFamily, ScopedRowKey.fromKey( scope, rowKey ) ).putColumn( version, true );
+            }
         } );
 
 
@@ -181,13 +206,20 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
         final MutationBatch batch = keyspace.prepareMutationBatch().withConsistencyLevel( cassandraConfig.getWriteCL() );
 
 
-        doWrite( scope, edge, new RowOp() {
+        doWrite( scope, edge, new RowOp<RowKey>() {
             @Override
-            public void doWrite( final MultiTennantColumnFamily columnFamily, final Object rowKey,
-                                 final DirectedEdge edge ) {
-
+            public <R extends RowKey> void doWrite(
+                    final MultiTennantColumnFamily<OrganizationScope, R, DirectedEdge> columnFamily, final R rowKey,
+                    final DirectedEdge edge ) {
                 batch.withRow( columnFamily, ScopedRowKey.fromKey( scope, rowKey ) ).deleteColumn( edge );
             }
+
+
+            @Override
+            public void doWrite( final MultiTennantColumnFamily<OrganizationScope, EdgeRowKey, UUID> columnFamily,
+                                 final EdgeRowKey rowKey, final UUID version ) {
+                batch.withRow( columnFamily, ScopedRowKey.fromKey( scope, rowKey ) ).deleteColumn( version );
+            }
         } );
 
 
@@ -197,7 +229,8 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
 
     /**
      * EdgeWrite the edges internally
-     * @param scope  The scope to encapsulate
+     *
+     * @param scope The scope to encapsulate
      * @param edge The edge to write
      * @param op The row operation to invoke
      */
@@ -229,23 +262,35 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
         final DirectedEdge targetEdge = new DirectedEdge( sourceNodeId, version );
 
 
+        final EdgeRowKey edgeRowKey = new EdgeRowKey( sourceNodeId, type, targetNodeId );
+
+
         /**
-         * EdgeWrite edges from target<-source
+         * write edges from source->target
          */
 
-
         op.doWrite( GRAPH_SOURCE_NODE_EDGES, sourceRowKey, sourceEdge );
 
         op.doWrite( GRAPH_SOURCE_NODE_TARGET_TYPE, sourceRowKeyType, sourceEdge );
 
+
+        /**
+         * write edges from target<-source
+         */
         op.doWrite( GRAPH_TARGET_NODE_EDGES, targetRowKey, targetEdge );
 
         op.doWrite( GRAPH_TARGET_NODE_SOURCE_TYPE, targetRowKeyType, targetEdge );
+
+
+        /**
+         * Write this in the version log for this edge of source->target
+         */
+        op.doWrite( GRAPH_EDGE_VERSIONS, edgeRowKey, version );
     }
 
 
     @Override
-    public Iterator<MarkedEdge> getEdgeFromSource( final OrganizationScope scope, final SearchByEdge search ) {
+    public Iterator<MarkedEdge> getEdgeVersions( final OrganizationScope scope, final SearchByEdge search ) {
         ValidationUtils.validateOrganizationScope( scope );
         EdgeUtils.validateSearchByEdge( search );
 
@@ -387,52 +432,6 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
 
 
     @Override
-    public Iterator<MarkedEdge> getEdgeToTarget( final OrganizationScope scope, final SearchByEdge search ) {
-        ValidationUtils.validateOrganizationScope( scope );
-        EdgeUtils.validateSearchByEdge( search );
-
-        final Id sourceId = search.sourceNode();
-        final Id targetId = search.targetNode();
-        final UUID maxVersion = search.getMaxVersion();
-        final String type = search.getType();
-
-        return getEdges( GRAPH_TARGET_NODE_EDGES, new EdgeSearcher<RowKey>( scope, search.last() ) {
-
-
-            @Override
-            public void setRange( final RangeBuilder builder ) {
-                if ( last.isPresent() ) {
-                    super.setRange( builder );
-                    return;
-                }
-
-                //set the last value in the range based on the max version
-                final DirectedEdge colValue = new DirectedEdge( sourceId, maxVersion );
-                builder.setStart( colValue, EDGE_SERIALIZER );
-            }
-
-
-            @Override
-            protected RowKey generateRowKey() {
-                return new RowKey( targetId, type );
-            }
-
-
-            @Override
-            protected DirectedEdge getStartColumn( final Edge last ) {
-                return new DirectedEdge( last.getSourceNode(), last.getVersion() );
-            }
-
-
-            @Override
-            protected MarkedEdge createEdge( final DirectedEdge edge, final boolean marked ) {
-                return new SimpleMarkedEdge( edge.id, type, targetId, edge.version, marked );
-            }
-        } );
-    }
-
-
-    @Override
     public Iterator<MarkedEdge> getEdgesToTargetBySourceType( final OrganizationScope scope,
                                                               final SearchByIdType edgeType ) {
 
@@ -490,20 +489,21 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
 
 
         RowQuery<ScopedRowKey<OrganizationScope, R>, DirectedEdge> query =
-                keyspace.prepareQuery( cf ).setConsistencyLevel( cassandraConfig.getReadCL() ).getKey( rowKey ).autoPaginate( true )
-                        .withColumnRange( rangeBuilder.build() );
-
+                keyspace.prepareQuery( cf ).setConsistencyLevel( cassandraConfig.getReadCL() ).getKey( rowKey )
+                        .autoPaginate( true ).withColumnRange( rangeBuilder.build() );
 
-        return new ColumnNameIterator<DirectedEdge, MarkedEdge>( query, searcher,
-                    searcher.hasPage(), graphFig.getReadTimeout() );
 
+        return new ColumnNameIterator<DirectedEdge, MarkedEdge>( query, searcher, searcher.hasPage() );
     }
 
 
     @Override
     public Collection<MultiTennantColumnFamilyDefinition> getColumnFamilies() {
         return Arrays.asList( graphCf( GRAPH_SOURCE_NODE_EDGES ), graphCf( GRAPH_TARGET_NODE_EDGES ),
-                graphCf( GRAPH_SOURCE_NODE_TARGET_TYPE ), graphCf( GRAPH_TARGET_NODE_SOURCE_TYPE ) );
+                graphCf( GRAPH_SOURCE_NODE_TARGET_TYPE ), graphCf( GRAPH_TARGET_NODE_SOURCE_TYPE ),
+                new MultiTennantColumnFamilyDefinition( GRAPH_EDGE_VERSIONS, BytesType.class.getSimpleName(),
+                        ColumnTypes.UUID_TYPE_REVERSED, BytesType.class.getSimpleName(),
+                        MultiTennantColumnFamilyDefinition.CacheOption.KEYS ) );
     }
 
 
@@ -511,8 +511,9 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
      * Helper to generate an edge definition by the type
      */
     private MultiTennantColumnFamilyDefinition graphCf( MultiTennantColumnFamily cf ) {
-        return new MultiTennantColumnFamilyDefinition( cf,
-                BytesType.class.getSimpleName(), ColumnTypes.DYNAMIC_COMPOSITE_TYPE, BytesType.class.getSimpleName() , MultiTennantColumnFamilyDefinition.CacheOption.KEYS);
+        return new MultiTennantColumnFamilyDefinition( cf, BytesType.class.getSimpleName(),
+                ColumnTypes.DYNAMIC_COMPOSITE_TYPE, BytesType.class.getSimpleName(),
+                MultiTennantColumnFamilyDefinition.CacheOption.KEYS );
     }
 
 
@@ -558,11 +559,12 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
             Preconditions.checkArgument( composite.size() == 3, "Composite should have 3 elements" );
 
 
-            //parse our id
-            final Id id = ID_COL_SERIALIZER.fromComposite( composite, 0 );
-
             //return the version
-            final UUID version = composite.get( 2, UUID_SERIALIZER );
+            final UUID version = composite.get( 0, UUID_SERIALIZER );
+
+
+            //parse our id
+            final Id id = ID_COL_SERIALIZER.fromComposite( composite, 1 );
 
 
             return new DirectedEdge( id, version );
@@ -575,11 +577,13 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
         private DynamicComposite createComposite( DirectedEdge edge, AbstractComposite.ComponentEquality equality ) {
             DynamicComposite composite = new DynamicComposite();
 
-            ID_COL_SERIALIZER.toComposite( composite, edge.id );
-
             //add our edge
             composite.addComponent( edge.version, UUID_SERIALIZER, ColumnTypes.UUID_TYPE_REVERSED, equality );
 
+
+            ID_COL_SERIALIZER.toComposite( composite, edge.id );
+
+
             return composite;
         }
     }
@@ -646,6 +650,23 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
 
 
     /**
+     * Used to store row keys by sourceId, targetId and edgeType
+     */
+    private static class EdgeRowKey {
+        public final Id sourceId;
+        public final Id targetId;
+        public final String edgeType;
+
+
+        private EdgeRowKey( final Id sourceId, final String edgeType, final Id targetId ) {
+            this.sourceId = sourceId;
+            this.targetId = targetId;
+            this.edgeType = edgeType;
+        }
+    }
+
+
+    /**
      * Searcher to be used when performing the search.  Performs I/O transformation as well as parsing for the iterator
      */
     private static abstract class EdgeSearcher<R> implements ColumnParser<DirectedEdge, MarkedEdge> {
@@ -779,11 +800,51 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
 
 
     /**
+     * Class to perform serialization for row keys from edges
+     */
+    private static class EdgeRowKeySerializer implements CompositeFieldSerializer<EdgeRowKey> {
+
+        private static final IdRowCompositeSerializer ID_SER = IdRowCompositeSerializer.get();
+
+
+        @Override
+        public void toComposite( final CompositeBuilder builder, final EdgeRowKey key ) {
+
+            //add the row id to the composite
+            ID_SER.toComposite( builder, key.sourceId );
+            builder.addString( key.edgeType );
+            ID_SER.toComposite( builder, key.targetId );
+        }
+
+
+        @Override
+        public EdgeRowKey fromComposite( final CompositeParser composite ) {
+
+            final Id sourceId = ID_SER.fromComposite( composite );
+            final String edgeType = composite.readString();
+            final Id targetId = ID_SER.fromComposite( composite );
+
+            return new EdgeRowKey( sourceId, edgeType, targetId );
+        }
+    }
+
+
+    /**
      * Simple callback to perform puts and deletes with a common row setup code
      */
     private static interface RowOp<R> {
 
-        void doWrite( final MultiTennantColumnFamily<OrganizationScope, R, DirectedEdge> columnFamily, R rowKey,
-                      DirectedEdge edge );
+        /**
+         * Write the edge with the given data
+         */
+        <R extends RowKey> void doWrite(
+                final MultiTennantColumnFamily<OrganizationScope, R, DirectedEdge> columnFamily, R rowKey,
+                DirectedEdge edge );
+
+        /**
+         * Write the edge into the version cf
+         */
+        void doWrite( final MultiTennantColumnFamily<OrganizationScope, EdgeRowKey, UUID> columnFamily,
+                      EdgeRowKey rowKey, UUID version );
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/136edaba/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/EdgeManagerIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/EdgeManagerIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/EdgeManagerIT.java
index 17a5f5a..379f56a 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/EdgeManagerIT.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/EdgeManagerIT.java
@@ -46,6 +46,7 @@ import com.google.inject.Inject;
 import rx.Observable;
 
 import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createEdge;
+import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createGetByEdge;
 import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createId;
 import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createSearchByEdge;
 import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createSearchByEdgeAndId;
@@ -72,7 +73,7 @@ public class EdgeManagerIT {
 
 
     @Inject
-    protected EdgeManagerFactory emf;
+    protected GraphManagerFactory emf;
 
     protected OrganizationScope scope;
 
@@ -93,7 +94,7 @@ public class EdgeManagerIT {
     @Test
     public void testWriteReadEdgeTypeSource() {
 
-        EdgeManager em = emf.createEdgeManager( scope );
+        GraphManager em = emf.createEdgeManager( scope );
 
 
         Edge edge = createEdge( "source", "test", "target" );
@@ -126,7 +127,7 @@ public class EdgeManagerIT {
     @Test
     public void testWriteReadEdgeTypeTarget() {
 
-        EdgeManager em = emf.createEdgeManager( scope );
+        GraphManager em = emf.createEdgeManager( scope );
 
 
         Edge edge = createEdge( "source", "test", "target" );
@@ -159,7 +160,7 @@ public class EdgeManagerIT {
     @Test
     public void testWriteReadEdgeTypeVersionSource() {
 
-        EdgeManager em = emf.createEdgeManager( scope );
+        GraphManager em = emf.createEdgeManager( scope );
 
         final UUID earlyVersion = UUIDGenerator.newTimeUUID();
 
@@ -194,7 +195,7 @@ public class EdgeManagerIT {
     @Test
     public void testWriteReadEdgeTypeVersionTarget() {
 
-        EdgeManager em = emf.createEdgeManager( scope );
+        GraphManager em = emf.createEdgeManager( scope );
 
 
         final UUID earlyVersion = UUIDGenerator.newTimeUUID();
@@ -233,7 +234,7 @@ public class EdgeManagerIT {
     @Test
     public void testWriteReadEdgeTypeVersionSourceDistinct() {
 
-        EdgeManager em = emf.createEdgeManager( scope );
+        GraphManager em = emf.createEdgeManager( scope );
 
         final UUID earlyVersion = UUIDGenerator.newTimeUUID();
 
@@ -266,6 +267,8 @@ public class EdgeManagerIT {
         Iterator<Edge> returned = edges.toBlockingObservable().getIterator();
 
         assertEquals( "Correct edge returned", edge3, returned.next() );
+        assertEquals( "Correct edge returned", edge2, returned.next() );
+        assertEquals( "Correct edge returned", edge1, returned.next() );
         assertFalse( "No more edges", returned.hasNext() );
 
         //now test with an earlier version, we shouldn't get the edge back
@@ -276,6 +279,7 @@ public class EdgeManagerIT {
         returned = edges.toBlockingObservable().getIterator();
 
         assertEquals( "Correct edge returned", edge2, returned.next() );
+        assertEquals( "Correct edge returned", edge1, returned.next() );
         assertFalse( "No more edges", returned.hasNext() );
 
         search = createSearchByEdge( edge1.getSourceNode(), edge1.getType(), edge1.getVersion(), null );
@@ -302,7 +306,7 @@ public class EdgeManagerIT {
     public void testWriteReadEdgeTypeVersionTargetDistinct() {
 
 
-        EdgeManager em = emf.createEdgeManager( scope );
+        GraphManager em = emf.createEdgeManager( scope );
 
         final UUID earlyVersion = UUIDGenerator.newTimeUUID();
 
@@ -335,6 +339,8 @@ public class EdgeManagerIT {
         Iterator<Edge> returned = edges.toBlockingObservable().getIterator();
 
         assertEquals( "Correct edge returned", edge3, returned.next() );
+        assertEquals( "Correct edge returned", edge2, returned.next() );
+        assertEquals( "Correct edge returned", edge1, returned.next() );
         assertFalse( "No more edges", returned.hasNext() );
 
         //now test with an earlier version, we shouldn't get the edge back
@@ -345,6 +351,7 @@ public class EdgeManagerIT {
         returned = edges.toBlockingObservable().getIterator();
 
         assertEquals( "Correct edge returned", edge2, returned.next() );
+        assertEquals( "Correct edge returned", edge1, returned.next() );
         assertFalse( "No more edges", returned.hasNext() );
 
         search = createSearchByEdge( edge1.getTargetNode(), edge1.getType(), edge1.getVersion(), null );
@@ -370,7 +377,7 @@ public class EdgeManagerIT {
     @Test
     public void testWriteReadEdgeTypePagingSource() {
 
-        EdgeManager em = emf.createEdgeManager( scope );
+        GraphManager em = emf.createEdgeManager( scope );
 
         final Id sourceId = createId( "source" );
 
@@ -391,7 +398,7 @@ public class EdgeManagerIT {
         //now test retrieving it
 
         SearchByEdgeType search =
-                createSearchByEdge( edge1.getSourceNode(), edge1.getType(), edge1.getVersion(), null );
+                createSearchByEdge( edge1.getSourceNode(), edge1.getType(), edge3.getVersion(), null );
 
         Observable<Edge> edges = em.loadEdgesFromSource( search );
 
@@ -400,10 +407,15 @@ public class EdgeManagerIT {
 
 
         //we have 3 edges, but we specified our first edge as the max, we shouldn't get any more results than the first
+        assertEquals( "Correct edge returned", edge3, returned.next() );
+
+        assertEquals( "Correct edge returned", edge2, returned.next() );
+
         assertEquals( "Correct edge returned", edge1, returned.next() );
 
         assertFalse( "No more edges", returned.hasNext() );
 
+        //still edge 3 is our max version, but we start with edge 2 as our last read
         search = createSearchByEdge( edge1.getSourceNode(), edge1.getType(), edge3.getVersion(), edge2 );
 
         edges = em.loadEdgesFromSource( search );
@@ -411,7 +423,7 @@ public class EdgeManagerIT {
         //implicitly blows up if more than 1 is returned from "single"
         returned = edges.toBlockingObservable().getIterator();
 
-        assertEquals( "Paged correctly", edge3, returned.next() );
+        assertEquals( "Paged correctly", edge1, returned.next() );
 
         assertFalse( "End of stream", returned.hasNext() );
     }
@@ -421,7 +433,7 @@ public class EdgeManagerIT {
     public void testWriteReadEdgeTypePagingTarget() {
 
 
-        EdgeManager em = emf.createEdgeManager( scope );
+        GraphManager em = emf.createEdgeManager( scope );
 
 
         final Id targetId = createId( "target" );
@@ -442,7 +454,7 @@ public class EdgeManagerIT {
         //now test retrieving it
 
         SearchByEdgeType search =
-                createSearchByEdge( edge1.getTargetNode(), edge1.getType(), edge1.getVersion(), null );
+                createSearchByEdge( edge1.getTargetNode(), edge1.getType(), edge3.getVersion(), null );
 
         Observable<Edge> edges = em.loadEdgesToTarget( search );
 
@@ -451,6 +463,10 @@ public class EdgeManagerIT {
 
 
         //we have 3 edges, but we specified our first edge as the max, we shouldn't get any more results than the first
+        assertEquals( "Correct edge returned", edge3, returned.next() );
+
+        assertEquals( "Correct edge returned", edge2, returned.next() );
+
         assertEquals( "Correct edge returned", edge1, returned.next() );
 
 
@@ -463,7 +479,7 @@ public class EdgeManagerIT {
         //implicitly blows up if more than 1 is returned from "single"
         returned = edges.toBlockingObservable().getIterator();
 
-        assertEquals( "Paged correctly", edge3, returned.next() );
+        assertEquals( "Paged correctly", edge1, returned.next() );
 
         assertFalse( "End of stream", returned.hasNext() );
     }
@@ -472,7 +488,7 @@ public class EdgeManagerIT {
     @Test
     public void testWriteReadEdgeTypeTargetTypeSource() {
 
-        EdgeManager em = emf.createEdgeManager( scope );
+        GraphManager em = emf.createEdgeManager( scope );
 
 
         Edge edge = createEdge( "source", "test", "target" );
@@ -508,7 +524,7 @@ public class EdgeManagerIT {
     @Test
     public void testWriteReadEdgeTypeTargetTypeTarget() {
 
-        EdgeManager em = emf.createEdgeManager( scope );
+        GraphManager em = emf.createEdgeManager( scope );
 
 
         Edge edge = createEdge( "source", "test", "target" );
@@ -544,7 +560,7 @@ public class EdgeManagerIT {
     @Test
     public void testWriteReadEdgeDeleteSource() {
 
-        EdgeManager em = emf.createEdgeManager( scope );
+        GraphManager em = emf.createEdgeManager( scope );
 
 
         Edge edge = createEdge( "source", "test", "target" );
@@ -573,9 +589,20 @@ public class EdgeManagerIT {
 
         assertEquals( "Correct edge returned", edge, returned );
 
+        final SearchByEdge searchByEdge = createGetByEdge(edge.getSourceNode(), edge.getType(), edge.getTargetNode(), edge.getVersion(), null);
+
+        returned = em.loadEdgeVersions(searchByEdge).toBlockingObservable().single();
+
+
+        assertEquals( "Correct edge returned", edge, returned );
+
 
         //now delete it
-        em.deleteEdge( edge ).toBlockingObservable().last();
+        returned = em.deleteEdge( edge ).toBlockingObservable().last();
+
+
+        assertEquals( "Correct edge returned", edge, returned );
+
 
         //now test retrieval, should be null
         edges = em.loadEdgesFromSource( search );
@@ -586,6 +613,7 @@ public class EdgeManagerIT {
         assertNull( "No edge returned", returned );
 
 
+
         //no search by type, should be null as well
 
         edges = em.loadEdgesFromSourceByType( searchById );
@@ -594,13 +622,17 @@ public class EdgeManagerIT {
         returned = edges.toBlockingObservable().singleOrDefault( null );
 
         assertNull( "No edge returned", returned );
+
+        returned = em.loadEdgeVersions(searchByEdge).toBlockingObservable().singleOrDefault(null);
+
+        assertNull( "No edge returned", returned );
     }
 
 
     @Test
     public void testWriteReadEdgeDeleteTarget() {
 
-        EdgeManager em = emf.createEdgeManager( scope );
+        GraphManager em = emf.createEdgeManager( scope );
 
 
         Edge edge = createEdge( "source", "test", "target" );
@@ -656,7 +688,7 @@ public class EdgeManagerIT {
     @Test
     public void testWriteReadEdgeTypesSourceTypes() {
 
-        final EdgeManager em = emf.createEdgeManager( scope );
+        final GraphManager em = emf.createEdgeManager( scope );
 
         Id sourceId = new SimpleId( "source" );
         Id targetId1 = new SimpleId( "target" );
@@ -718,7 +750,7 @@ public class EdgeManagerIT {
     @Test
     public void testWriteReadEdgeTypesTargetTypes() {
 
-        final EdgeManager em = emf.createEdgeManager( scope );
+        final GraphManager em = emf.createEdgeManager( scope );
 
         Id sourceId1 = new SimpleId( "source" );
         Id sourceId2 = new SimpleId( "source2" );
@@ -783,7 +815,7 @@ public class EdgeManagerIT {
     @Test
     public void testWriteReadEdgeTypesSourceTypesPaging() {
 
-        final EdgeManager em = emf.createEdgeManager( scope );
+        final GraphManager em = emf.createEdgeManager( scope );
 
         Id sourceId1 = new SimpleId( "source" );
         Id targetId1 = new SimpleId( "target" );
@@ -861,7 +893,7 @@ public class EdgeManagerIT {
     @Test
     public void testWriteReadEdgeTypesTargetTypesPaging() {
 
-        final EdgeManager em = emf.createEdgeManager( scope );
+        final GraphManager em = emf.createEdgeManager( scope );
 
         Id sourceId1 = new SimpleId( "source" );
         Id sourceId2 = new SimpleId( "source2" );
@@ -942,7 +974,7 @@ public class EdgeManagerIT {
     @Test
     public void testMarkSourceEdges() {
 
-        final EdgeManager em = emf.createEdgeManager( scope );
+        final GraphManager em = emf.createEdgeManager( scope );
 
         Id sourceId = new SimpleId( "source" );
         Id targetId1 = new SimpleId( "target" );
@@ -968,10 +1000,10 @@ public class EdgeManagerIT {
         Iterator<Edge> results = edges.toBlockingObservable().getIterator();
 
 
-        assertEquals( "Edges correct", edge1, results.next() );
-
         assertEquals( "Edges correct", edge2, results.next() );
 
+        assertEquals( "Edges correct", edge1, results.next() );
+
         assertFalse( "No more edges", results.hasNext() );
 
         //now delete one of the edges
@@ -1011,7 +1043,7 @@ public class EdgeManagerIT {
     @Test
     public void testMarkTargetEdges() {
 
-        final EdgeManager em = emf.createEdgeManager( scope );
+        final GraphManager em = emf.createEdgeManager( scope );
 
         Id sourceId1 = new SimpleId( "source" );
         Id sourceId2 = new SimpleId( "source2" );
@@ -1037,9 +1069,10 @@ public class EdgeManagerIT {
         Iterator<Edge> results = edges.toBlockingObservable().getIterator();
 
 
+        assertEquals( "Edges correct", edge2, results.next() );
+
         assertEquals( "Edges correct", edge1, results.next() );
 
-        assertEquals( "Edges correct", edge2, results.next() );
 
         assertFalse( "No more edges", results.hasNext() );
 
@@ -1078,7 +1111,7 @@ public class EdgeManagerIT {
     @Test
     public void testMarkSourceEdgesType() {
 
-        final EdgeManager em = emf.createEdgeManager( scope );
+        final GraphManager em = emf.createEdgeManager( scope );
 
         Id sourceId = new SimpleId( "source" );
         Id targetId1 = new SimpleId( "target" );
@@ -1156,7 +1189,7 @@ public class EdgeManagerIT {
     @Test
     public void testMarkTargetEdgesType() {
 
-        final EdgeManager em = emf.createEdgeManager( scope );
+        final GraphManager em = emf.createEdgeManager( scope );
 
         Id sourceId1 = new SimpleId( "source" );
         Id sourceId2 = new SimpleId( "source2" );
@@ -1234,7 +1267,7 @@ public class EdgeManagerIT {
     @Test
     public void markSourceNode() {
 
-        final EdgeManager em = emf.createEdgeManager( scope );
+        final GraphManager em = emf.createEdgeManager( scope );
 
         Id sourceId = new SimpleId( "source" );
         Id targetId1 = new SimpleId( "target" );
@@ -1256,10 +1289,10 @@ public class EdgeManagerIT {
                   .toBlockingObservable().getIterator();
 
 
-        assertEquals( "Edge found", edge1, results.next() );
-
         assertEquals( "Edge found", edge2, results.next() );
 
+        assertEquals( "Edge found", edge1, results.next() );
+
         assertFalse( "No more edges", results.hasNext() );
 
 
@@ -1313,12 +1346,10 @@ public class EdgeManagerIT {
     }
 
 
-
-
     @Test
     public void markTargetNode() {
 
-        final EdgeManager em = emf.createEdgeManager( scope );
+        final GraphManager em = emf.createEdgeManager( scope );
 
         Id sourceId1 = new SimpleId( "source" );
         Id sourceId2 = new SimpleId( "source2" );
@@ -1340,10 +1371,10 @@ public class EdgeManagerIT {
                   .toBlockingObservable().getIterator();
 
 
-        assertEquals( "Edge found", edge1, results.next() );
-
         assertEquals( "Edge found", edge2, results.next() );
 
+        assertEquals( "Edge found", edge1, results.next() );
+
         assertFalse( "No more edges", results.hasNext() );
 
 
@@ -1397,10 +1428,9 @@ public class EdgeManagerIT {
     }
 
 
-
     @Test(expected = NullPointerException.class)
     public void invalidEdgeTypesWrite( @All Edge edge ) {
-        final EdgeManager em = emf.createEdgeManager( scope );
+        final GraphManager em = emf.createEdgeManager( scope );
 
         em.writeEdge( edge );
     }
@@ -1408,7 +1438,7 @@ public class EdgeManagerIT {
 
     @Test(expected = NullPointerException.class)
     public void invalidEdgeTypesDelete( @All Edge edge ) {
-        final EdgeManager em = emf.createEdgeManager( scope );
+        final GraphManager em = emf.createEdgeManager( scope );
 
         em.deleteEdge( edge );
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/136edaba/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/EdgeManagerStressTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/EdgeManagerStressTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/EdgeManagerStressTest.java
index 773e3ee..9d8db6e 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/EdgeManagerStressTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/EdgeManagerStressTest.java
@@ -28,6 +28,7 @@ import org.jukito.JukitoRunner;
 import org.jukito.UseModules;
 import org.junit.Before;
 import org.junit.ClassRule;
+import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -58,13 +59,13 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 
-@RunWith( JukitoRunner.class )
-@UseModules( TestGraphModule.class )
+@RunWith(JukitoRunner.class)
+@UseModules(TestGraphModule.class)
 public class EdgeManagerStressTest {
     private static final Logger log = LoggerFactory.getLogger( EdgeManagerStressTest.class );
 
     @Inject
-    private EdgeManagerFactory factory;
+    private GraphManagerFactory factory;
 
     @ClassRule
     public static CassandraRule rule = new CassandraRule();
@@ -91,6 +92,7 @@ public class EdgeManagerStressTest {
 
 
     @Test
+    @Ignore
     public void writeThousands() throws InterruptedException {
         EdgeGenerator generator = new EdgeGenerator() {
 
@@ -108,7 +110,7 @@ public class EdgeManagerStressTest {
 
 
             @Override
-            public Observable<Edge> doSearch( final EdgeManager manager ) {
+            public Observable<Edge> doSearch( final GraphManager manager ) {
 
 
                 final UUID uuid = UUIDGenerator.newTimeUUID();
@@ -176,6 +178,8 @@ public class EdgeManagerStressTest {
         doTest( generator );
     }
 
+
+    @Ignore
     @Test
     public void writeThousandsSingleSource() throws InterruptedException {
         EdgeGenerator generator = new EdgeGenerator() {
@@ -193,7 +197,7 @@ public class EdgeManagerStressTest {
 
 
             @Override
-            public Observable<Edge> doSearch( final EdgeManager manager ) {
+            public Observable<Edge> doSearch( final GraphManager manager ) {
                 UUID uuid = UUIDGenerator.newTimeUUID();
 
                 return manager.loadEdgesFromSource( new SimpleSearchByEdgeType( sourceId, "test", uuid, null ) );
@@ -203,41 +207,41 @@ public class EdgeManagerStressTest {
         doTest( generator );
     }
 
+
     @Test
-       public void writeThousandsSingleTarget() throws InterruptedException {
-           EdgeGenerator generator = new EdgeGenerator() {
+    @Ignore
+    public void writeThousandsSingleTarget() throws InterruptedException {
+        EdgeGenerator generator = new EdgeGenerator() {
 
-               private Id targetId = createId( "target" );
+            private Id targetId = createId( "target" );
 
 
-               @Override
-               public Edge newEdge() {
-                   Edge edge = createEdge( createId( "source" ), "test", targetId );
+            @Override
+            public Edge newEdge() {
+                Edge edge = createEdge( createId( "source" ), "test", targetId );
 
 
-                   return edge;
-               }
+                return edge;
+            }
 
 
-               @Override
-               public Observable<Edge> doSearch( final EdgeManager manager ) {
-                   UUID uuid = UUIDGenerator.newTimeUUID();
+            @Override
+            public Observable<Edge> doSearch( final GraphManager manager ) {
+                UUID uuid = UUIDGenerator.newTimeUUID();
 
-                   return manager.loadEdgesToTarget( new SimpleSearchByEdgeType( targetId, "test", uuid, null ) );
-               }
-           };
+                return manager.loadEdgesToTarget( new SimpleSearchByEdgeType( targetId, "test", uuid, null ) );
+            }
+        };
 
-           doTest( generator );
-       }
+        doTest( generator );
+    }
 
 
     /**
      * Execute the test with the generator
-     * @param generator
-     * @throws InterruptedException
      */
     private void doTest( EdgeGenerator generator ) throws InterruptedException {
-        EdgeManager manager = factory.createEdgeManager( scope );
+        GraphManager manager = factory.createEdgeManager( scope );
 
         int limit = 10000;
 
@@ -308,6 +312,6 @@ public class EdgeManagerStressTest {
          */
         public Edge newEdge();
 
-        public Observable<Edge> doSearch( final EdgeManager manager );
+        public Observable<Edge> doSearch( final GraphManager manager );
     }
 }