You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/03/25 16:21:13 UTC

[14/43] git commit: Finished serialization I/O impls

Finished serialization I/O impls


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/078ca7dc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/078ca7dc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/078ca7dc

Branch: refs/heads/two-dot-o
Commit: 078ca7dca71ce62b7a5ae0469f7485563582145c
Parents: 96e6f5f
Author: Todd Nine <tn...@apigee.com>
Authored: Fri Mar 14 15:14:11 2014 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri Mar 14 15:14:11 2014 -0700

----------------------------------------------------------------------
 .../persistence/graph/guice/GraphModule.java    |   4 +
 .../graph/impl/stage/AbstractEdgeRepair.java    | 152 +++++++++++++++
 .../graph/impl/stage/EdgeDeleteRepairImpl.java  |  76 ++++++++
 .../graph/impl/stage/EdgeWriteRepairImpl.java   |  95 +---------
 .../graph/impl/stage/EdgeDeleteRepairTest.java  | 189 +++++++++++++++++++
 .../graph/impl/stage/EdgeWriteRepairTest.java   |   2 +
 6 files changed, 431 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/078ca7dc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
index c81dc82..a5dda9a 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
@@ -34,6 +34,8 @@ import org.apache.usergrid.persistence.graph.consistency.LocalTimeoutQueue;
 import org.apache.usergrid.persistence.graph.consistency.TimeoutQueue;
 import org.apache.usergrid.persistence.graph.impl.CollectionIndexObserver;
 import org.apache.usergrid.persistence.graph.impl.EdgeManagerImpl;
+import org.apache.usergrid.persistence.graph.impl.stage.EdgeDeleteRepair;
+import org.apache.usergrid.persistence.graph.impl.stage.EdgeDeleteRepairImpl;
 import org.apache.usergrid.persistence.graph.impl.stage.EdgeMetaRepair;
 import org.apache.usergrid.persistence.graph.impl.stage.EdgeMetaRepairImpl;
 import org.apache.usergrid.persistence.graph.impl.stage.EdgeWriteRepair;
@@ -107,6 +109,8 @@ public class GraphModule extends AbstractModule {
         bind( EdgeMetaRepair.class).to( EdgeMetaRepairImpl.class );
 
         bind( EdgeWriteRepair.class).to( EdgeWriteRepairImpl.class );
+
+        bind( EdgeDeleteRepair.class).to( EdgeDeleteRepairImpl.class );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/078ca7dc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/AbstractEdgeRepair.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/AbstractEdgeRepair.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/AbstractEdgeRepair.java
new file mode 100644
index 0000000..838de69
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/AbstractEdgeRepair.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.impl.stage;
+
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphFig;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
+import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
+import org.apache.usergrid.persistence.graph.serialization.impl.parse.ObservableIterator;
+
+import com.fasterxml.uuid.UUIDComparator;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import rx.Observable;
+import rx.Scheduler;
+import rx.functions.Func1;
+
+
+/**
+ * SimpleRepair operation
+ *
+ */
+@Singleton
+public abstract class AbstractEdgeRepair  {
+
+    protected final EdgeSerialization edgeSerialization;
+    protected final GraphFig graphFig;
+    protected final Keyspace keyspace;
+    protected final Scheduler scheduler;
+
+
+    @Inject
+    public AbstractEdgeRepair( final EdgeSerialization edgeSerialization, final GraphFig graphFig,
+                               final Keyspace keyspace, final Scheduler scheduler ) {
+        this.edgeSerialization = edgeSerialization;
+        this.graphFig = graphFig;
+        this.keyspace = keyspace;
+        this.scheduler = scheduler;
+    }
+
+
+
+    public Observable<MarkedEdge> repair( final OrganizationScope scope, final Edge edge ) {
+
+        final UUID maxVersion = edge.getVersion();
+
+        //get source edges
+        Observable<MarkedEdge> sourceEdges = getEdgeVersionsFromSource( scope, edge );
+
+        //get target edges
+        Observable<MarkedEdge> targetEdges = getEdgeVersionsToTarget( scope, edge );
+
+
+        //merge source and target then deal with the distinct values
+        return Observable.merge( sourceEdges, targetEdges ).filter(getFilter(maxVersion)).distinctUntilChanged().buffer( graphFig.getScanPageSize() )
+                         .flatMap( new Func1<List<MarkedEdge>, Observable<MarkedEdge>>() {
+                             @Override
+                             public Observable<MarkedEdge> call( final List<MarkedEdge> markedEdges ) {
+                                 final MutationBatch batch = keyspace.prepareMutationBatch();
+
+                                 for ( MarkedEdge edge : markedEdges ) {
+                                     final MutationBatch delete = edgeSerialization.deleteEdge( scope, edge );
+
+                                     batch.mergeShallow( delete );
+                                 }
+
+                                 try {
+                                     batch.execute();
+                                 }
+                                 catch ( ConnectionException e ) {
+                                     throw new RuntimeException( "Unable to issue write to cassandra", e );
+                                 }
+
+                                 return Observable.from( markedEdges ).subscribeOn( scheduler );
+                             }
+             } );
+    }
+
+
+    /**
+     * A filter used to filter out edges appropriately for the max version
+     *
+     * @param maxVersion the max version to use
+     * @return The filter to use in the max version
+     */
+    protected abstract Func1<MarkedEdge, Boolean> getFilter(final UUID maxVersion);
+
+    /**
+     * Get all edge versions <= the specified max from the source
+     */
+    private Observable<MarkedEdge> getEdgeVersionsFromSource( final OrganizationScope scope, final Edge edge ) {
+
+        return Observable.create( new ObservableIterator<MarkedEdge>() {
+            @Override
+            protected Iterator<MarkedEdge> getIterator() {
+
+                final SimpleSearchByEdge search =
+                        new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(),
+                                edge.getVersion(), null );
+
+                return edgeSerialization.getEdgeFromSource( scope, search );
+            }
+        } ).subscribeOn( scheduler );
+    }
+
+
+    /**
+     * Get all edge versions <= the specified max from the source
+     */
+    private Observable<MarkedEdge> getEdgeVersionsToTarget( final OrganizationScope scope, final Edge edge ) {
+
+        return Observable.create( new ObservableIterator<MarkedEdge>() {
+            @Override
+            protected Iterator<MarkedEdge> getIterator() {
+
+                final SimpleSearchByEdge search =
+                        new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(),
+                                edge.getVersion(), null );
+
+                return edgeSerialization.getEdgeToTarget( scope, search );
+            }
+        } ).subscribeOn( scheduler );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/078ca7dc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
new file mode 100644
index 0000000..bf5d3d2
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.impl.stage;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphFig;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
+
+import com.fasterxml.uuid.UUIDComparator;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.netflix.astyanax.Keyspace;
+
+import rx.Observable;
+import rx.Scheduler;
+import rx.functions.Func1;
+
+
+/**
+ * SimpleRepair operation
+ */
+@Singleton
+public class EdgeDeleteRepairImpl extends AbstractEdgeRepair implements EdgeDeleteRepair {
+
+
+    @Inject
+    public EdgeDeleteRepairImpl( final EdgeSerialization edgeSerialization, final GraphFig graphFig,
+                                 final Keyspace keyspace, final Scheduler scheduler ) {
+        super( edgeSerialization, graphFig, keyspace, scheduler );
+    }
+
+
+    @Override
+    public Observable<MarkedEdge> repair( final OrganizationScope scope, final Edge edge ) {
+
+        return super.repair( scope, edge );
+    }
+
+
+    @Override
+    protected Func1<MarkedEdge, Boolean> getFilter( final UUID maxVersion ) {
+        return new Func1<MarkedEdge, Boolean>() {
+            /**
+             * We only want to return edges < this version so we remove them
+             * @param markedEdge
+             * @return
+             */
+            @Override
+            public Boolean call( final MarkedEdge markedEdge ) {
+                return UUIDComparator.staticCompare( markedEdge.getVersion(), maxVersion ) < 1;
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/078ca7dc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairImpl.java
index 548c0e9..e8420a8 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairImpl.java
@@ -20,24 +20,18 @@
 package org.apache.usergrid.persistence.graph.impl.stage;
 
 
-import java.util.Iterator;
-import java.util.List;
 import java.util.UUID;
 
 import org.apache.usergrid.persistence.collection.OrganizationScope;
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.GraphFig;
 import org.apache.usergrid.persistence.graph.MarkedEdge;
-import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
 import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
-import org.apache.usergrid.persistence.graph.serialization.impl.parse.ObservableIterator;
 
 import com.fasterxml.uuid.UUIDComparator;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.netflix.astyanax.Keyspace;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
 import rx.Observable;
 import rx.Scheduler;
@@ -46,41 +40,28 @@ import rx.functions.Func1;
 
 /**
  * SimpleRepair operation
- *
  */
 @Singleton
-public class EdgeWriteRepairImpl implements EdgeWriteRepair {
-
-    protected final EdgeSerialization edgeSerialization;
-    protected final GraphFig graphFig;
-    protected final Keyspace keyspace;
-    protected final Scheduler scheduler;
+public class EdgeWriteRepairImpl extends AbstractEdgeRepair implements EdgeWriteRepair {
 
 
     @Inject
     public EdgeWriteRepairImpl( final EdgeSerialization edgeSerialization, final GraphFig graphFig,
                                 final Keyspace keyspace, final Scheduler scheduler ) {
-        this.edgeSerialization = edgeSerialization;
-        this.graphFig = graphFig;
-        this.keyspace = keyspace;
-        this.scheduler = scheduler;
+        super( edgeSerialization, graphFig, keyspace, scheduler );
     }
 
 
     @Override
     public Observable<MarkedEdge> repair( final OrganizationScope scope, final Edge edge ) {
 
-        final UUID maxVersion = edge.getVersion();
-
-        //get source edges
-        Observable<MarkedEdge> sourceEdges = getEdgeVersionsFromSource( scope, edge );
-
-        //get target edges
-        Observable<MarkedEdge> targetEdges = getEdgeVersionsToTarget( scope, edge );
+        return super.repair( scope, edge );
+    }
 
 
-        //merge source and target then deal with the distinct values
-        return Observable.merge( sourceEdges, targetEdges ).filter( new Func1<MarkedEdge, Boolean>() {
+    @Override
+    protected Func1<MarkedEdge, Boolean> getFilter( final UUID maxVersion ) {
+        return new Func1<MarkedEdge, Boolean>() {
             /**
              * We only want to return edges < this version so we remove them
              * @param markedEdge
@@ -90,66 +71,6 @@ public class EdgeWriteRepairImpl implements EdgeWriteRepair {
             public Boolean call( final MarkedEdge markedEdge ) {
                 return UUIDComparator.staticCompare( markedEdge.getVersion(), maxVersion ) < 0;
             }
-            //buffer the deletes and issue them in a single mutation
-        } ).distinctUntilChanged().buffer( graphFig.getScanPageSize() )
-                         .flatMap( new Func1<List<MarkedEdge>, Observable<MarkedEdge>>() {
-                             @Override
-                             public Observable<MarkedEdge> call( final List<MarkedEdge> markedEdges ) {
-                                 final MutationBatch batch = keyspace.prepareMutationBatch();
-
-                                 for ( MarkedEdge edge : markedEdges ) {
-                                     final MutationBatch delete = edgeSerialization.deleteEdge( scope, edge );
-
-                                     batch.mergeShallow( delete );
-                                 }
-
-                                 try {
-                                     batch.execute();
-                                 }
-                                 catch ( ConnectionException e ) {
-                                     throw new RuntimeException( "Unable to issue write to cassandra", e );
-                                 }
-
-                                 return Observable.from( markedEdges ).subscribeOn( scheduler );
-                             }
-             } );
-    }
-
-
-    /**
-     * Get all edge versions <= the specified max from the source
-     */
-    private Observable<MarkedEdge> getEdgeVersionsFromSource( final OrganizationScope scope, final Edge edge ) {
-
-        return Observable.create( new ObservableIterator<MarkedEdge>() {
-            @Override
-            protected Iterator<MarkedEdge> getIterator() {
-
-                final SimpleSearchByEdge search =
-                        new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(),
-                                edge.getVersion(), null );
-
-                return edgeSerialization.getEdgeFromSource( scope, search );
-            }
-        } ).subscribeOn( scheduler );
-    }
-
-
-    /**
-     * Get all edge versions <= the specified max from the source
-     */
-    private Observable<MarkedEdge> getEdgeVersionsToTarget( final OrganizationScope scope, final Edge edge ) {
-
-        return Observable.create( new ObservableIterator<MarkedEdge>() {
-            @Override
-            protected Iterator<MarkedEdge> getIterator() {
-
-                final SimpleSearchByEdge search =
-                        new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(),
-                                edge.getVersion(), null );
-
-                return edgeSerialization.getEdgeToTarget( scope, search );
-            }
-        } ).subscribeOn( scheduler );
+        };
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/078ca7dc/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairTest.java
new file mode 100644
index 0000000..e8eedc9
--- /dev/null
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.impl.stage;
+
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.jukito.JukitoRunner;
+import org.jukito.UseModules;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.collection.cassandra.CassandraRule;
+import org.apache.usergrid.persistence.collection.guice.MigrationManagerRule;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.guice.TestGraphModule;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
+import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import com.google.inject.Inject;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createEdge;
+import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createId;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+/**
+ *
+ *
+ */
+@RunWith( JukitoRunner.class )
+@UseModules( { TestGraphModule.class } )
+public class EdgeDeleteRepairTest {
+
+    @ClassRule
+    public static CassandraRule rule = new CassandraRule();
+
+
+    @Inject
+    @Rule
+    public MigrationManagerRule migrationManagerRule;
+
+
+    @Inject
+    protected EdgeSerialization edgeSerialization;
+
+    @Inject
+    protected EdgeDeleteRepair edgeDeleteRepair;
+
+
+    protected OrganizationScope scope;
+
+
+    @Before
+    public void setup() {
+        scope = mock( OrganizationScope.class );
+
+        Id orgId = mock( Id.class );
+
+        when( orgId.getType() ).thenReturn( "organization" );
+        when( orgId.getUuid() ).thenReturn( UUIDGenerator.newTimeUUID() );
+
+        when( scope.getOrganization() ).thenReturn( orgId );
+    }
+
+
+    /**
+     * Test repairing with no edges
+     */
+    @Test
+    public void noEdges() {
+        Edge edge = createEdge( "source", "test", "target" );
+
+        Iterator<MarkedEdge> edges = edgeDeleteRepair.repair( scope, edge ).toBlockingObservable().getIterator();
+
+        assertFalse( "No edges cleaned", edges.hasNext() );
+    }
+
+
+    /**
+     * Test repairing with no edges
+     * TODO: TN.  There appears to be a race condition here with ordering.  Not sure if this is intentional as part of the impl
+     * or if it's an issue
+     */
+    @Test
+    public void versionTest() throws ConnectionException {
+        final int size = 10;
+
+        final List<Edge> versions = new ArrayList<Edge>( size );
+
+        final Id sourceId = createId( "source" );
+        final Id targetId = createId( "target" );
+        final String edgeType = "edge";
+
+        for ( int i = 0; i < size; i++ ) {
+            final Edge edge = createEdge( sourceId, edgeType, targetId );
+
+            versions.add( edge );
+
+            edgeSerialization.writeEdge( scope, edge ).execute();
+
+            System.out.println(String.format("[%d] %s", i, edge));
+        }
+
+
+        int deleteIndex = size / 2;
+
+        Edge keep = versions.get( deleteIndex );
+
+        Iterable<MarkedEdge> edges = edgeDeleteRepair.repair( scope, keep ).toBlockingObservable().toIterable();
+
+
+        int index = 0;
+
+        for ( MarkedEdge edge : edges ) {
+
+            final Edge removed = versions.get( deleteIndex - index );
+
+            assertEquals( "Removed matches saved index", removed, edge );
+
+            index++;
+        }
+
+        //now verify we get all the versions we expect back
+        Iterator<MarkedEdge> iterator = edgeSerialization.getEdgeFromSource( scope,
+                new SimpleSearchByEdge( sourceId, edgeType, targetId, UUIDGenerator.newTimeUUID(), null ) );
+
+        index = 0;
+
+        for(MarkedEdge edge: new IterableWrapper<MarkedEdge>( iterator )){
+
+            final Edge saved = versions.get( size - index - 1);
+
+            assertEquals(saved, edge);
+
+            index++;
+        }
+
+        final int keptCount = size-deleteIndex;
+
+        assertEquals("Kept edge version was the minimum", keptCount, index+1);
+    }
+
+
+    private class IterableWrapper<T> implements Iterable<T> {
+        private final Iterator<T> sourceIterator;
+
+
+        private IterableWrapper( final Iterator<T> sourceIterator ) {
+            this.sourceIterator = sourceIterator;
+        }
+
+
+        @Override
+        public Iterator<T> iterator() {
+            return this.sourceIterator;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/078ca7dc/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairTest.java
index 352639e..f796c85 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairTest.java
@@ -109,6 +109,8 @@ public class EdgeWriteRepairTest {
 
     /**
      * Test repairing with no edges
+     * TODO: TN.  There appears to be a race condition here with ordering.  Not sure if this is intentional as part of the impl
+     * or if it's an issue
      */
     @Test
     public void versionTest() throws ConnectionException {