You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/03/25 16:21:13 UTC
[14/43] git commit: Finished serialization I/O impls
Finished serialization I/O impls
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/078ca7dc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/078ca7dc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/078ca7dc
Branch: refs/heads/two-dot-o
Commit: 078ca7dca71ce62b7a5ae0469f7485563582145c
Parents: 96e6f5f
Author: Todd Nine <tn...@apigee.com>
Authored: Fri Mar 14 15:14:11 2014 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri Mar 14 15:14:11 2014 -0700
----------------------------------------------------------------------
.../persistence/graph/guice/GraphModule.java | 4 +
.../graph/impl/stage/AbstractEdgeRepair.java | 152 +++++++++++++++
.../graph/impl/stage/EdgeDeleteRepairImpl.java | 76 ++++++++
.../graph/impl/stage/EdgeWriteRepairImpl.java | 95 +---------
.../graph/impl/stage/EdgeDeleteRepairTest.java | 189 +++++++++++++++++++
.../graph/impl/stage/EdgeWriteRepairTest.java | 2 +
6 files changed, 431 insertions(+), 87 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/078ca7dc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
index c81dc82..a5dda9a 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
@@ -34,6 +34,8 @@ import org.apache.usergrid.persistence.graph.consistency.LocalTimeoutQueue;
import org.apache.usergrid.persistence.graph.consistency.TimeoutQueue;
import org.apache.usergrid.persistence.graph.impl.CollectionIndexObserver;
import org.apache.usergrid.persistence.graph.impl.EdgeManagerImpl;
+import org.apache.usergrid.persistence.graph.impl.stage.EdgeDeleteRepair;
+import org.apache.usergrid.persistence.graph.impl.stage.EdgeDeleteRepairImpl;
import org.apache.usergrid.persistence.graph.impl.stage.EdgeMetaRepair;
import org.apache.usergrid.persistence.graph.impl.stage.EdgeMetaRepairImpl;
import org.apache.usergrid.persistence.graph.impl.stage.EdgeWriteRepair;
@@ -107,6 +109,8 @@ public class GraphModule extends AbstractModule {
bind( EdgeMetaRepair.class).to( EdgeMetaRepairImpl.class );
bind( EdgeWriteRepair.class).to( EdgeWriteRepairImpl.class );
+
+ bind( EdgeDeleteRepair.class).to( EdgeDeleteRepairImpl.class );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/078ca7dc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/AbstractEdgeRepair.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/AbstractEdgeRepair.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/AbstractEdgeRepair.java
new file mode 100644
index 0000000..838de69
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/AbstractEdgeRepair.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.impl.stage;
+
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphFig;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
+import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
+import org.apache.usergrid.persistence.graph.serialization.impl.parse.ObservableIterator;
+
+import com.fasterxml.uuid.UUIDComparator;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import rx.Observable;
+import rx.Scheduler;
+import rx.functions.Func1;
+
+
+/**
+ * SimpleRepair operation
+ *
+ */
+@Singleton
+public abstract class AbstractEdgeRepair {
+
+ protected final EdgeSerialization edgeSerialization;
+ protected final GraphFig graphFig;
+ protected final Keyspace keyspace;
+ protected final Scheduler scheduler;
+
+
+ @Inject
+ public AbstractEdgeRepair( final EdgeSerialization edgeSerialization, final GraphFig graphFig,
+ final Keyspace keyspace, final Scheduler scheduler ) {
+ this.edgeSerialization = edgeSerialization;
+ this.graphFig = graphFig;
+ this.keyspace = keyspace;
+ this.scheduler = scheduler;
+ }
+
+
+
+ public Observable<MarkedEdge> repair( final OrganizationScope scope, final Edge edge ) {
+
+ final UUID maxVersion = edge.getVersion();
+
+ //get source edges
+ Observable<MarkedEdge> sourceEdges = getEdgeVersionsFromSource( scope, edge );
+
+ //get target edges
+ Observable<MarkedEdge> targetEdges = getEdgeVersionsToTarget( scope, edge );
+
+
+ //merge source and target then deal with the distinct values
+ return Observable.merge( sourceEdges, targetEdges ).filter(getFilter(maxVersion)).distinctUntilChanged().buffer( graphFig.getScanPageSize() )
+ .flatMap( new Func1<List<MarkedEdge>, Observable<MarkedEdge>>() {
+ @Override
+ public Observable<MarkedEdge> call( final List<MarkedEdge> markedEdges ) {
+ final MutationBatch batch = keyspace.prepareMutationBatch();
+
+ for ( MarkedEdge edge : markedEdges ) {
+ final MutationBatch delete = edgeSerialization.deleteEdge( scope, edge );
+
+ batch.mergeShallow( delete );
+ }
+
+ try {
+ batch.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to issue write to cassandra", e );
+ }
+
+ return Observable.from( markedEdges ).subscribeOn( scheduler );
+ }
+ } );
+ }
+
+
+ /**
+ * A filter used to filter out edges appropriately for the max version
+ *
+ * @param maxVersion the max version to use
+ * @return The filter to use in the max version
+ */
+ protected abstract Func1<MarkedEdge, Boolean> getFilter(final UUID maxVersion);
+
+ /**
+ * Get all edge versions <= the specified max from the source
+ */
+ private Observable<MarkedEdge> getEdgeVersionsFromSource( final OrganizationScope scope, final Edge edge ) {
+
+ return Observable.create( new ObservableIterator<MarkedEdge>() {
+ @Override
+ protected Iterator<MarkedEdge> getIterator() {
+
+ final SimpleSearchByEdge search =
+ new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(),
+ edge.getVersion(), null );
+
+ return edgeSerialization.getEdgeFromSource( scope, search );
+ }
+ } ).subscribeOn( scheduler );
+ }
+
+
+ /**
+ * Get all edge versions <= the specified max from the source
+ */
+ private Observable<MarkedEdge> getEdgeVersionsToTarget( final OrganizationScope scope, final Edge edge ) {
+
+ return Observable.create( new ObservableIterator<MarkedEdge>() {
+ @Override
+ protected Iterator<MarkedEdge> getIterator() {
+
+ final SimpleSearchByEdge search =
+ new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(),
+ edge.getVersion(), null );
+
+ return edgeSerialization.getEdgeToTarget( scope, search );
+ }
+ } ).subscribeOn( scheduler );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/078ca7dc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
new file mode 100644
index 0000000..bf5d3d2
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.impl.stage;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphFig;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
+
+import com.fasterxml.uuid.UUIDComparator;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.netflix.astyanax.Keyspace;
+
+import rx.Observable;
+import rx.Scheduler;
+import rx.functions.Func1;
+
+
+/**
+ * SimpleRepair operation
+ */
+@Singleton
+public class EdgeDeleteRepairImpl extends AbstractEdgeRepair implements EdgeDeleteRepair {
+
+
+ @Inject
+ public EdgeDeleteRepairImpl( final EdgeSerialization edgeSerialization, final GraphFig graphFig,
+ final Keyspace keyspace, final Scheduler scheduler ) {
+ super( edgeSerialization, graphFig, keyspace, scheduler );
+ }
+
+
+ @Override
+ public Observable<MarkedEdge> repair( final OrganizationScope scope, final Edge edge ) {
+
+ return super.repair( scope, edge );
+ }
+
+
+ @Override
+ protected Func1<MarkedEdge, Boolean> getFilter( final UUID maxVersion ) {
+ return new Func1<MarkedEdge, Boolean>() {
+ /**
+ * We only want to return edges < this version so we remove them
+ * @param markedEdge
+ * @return
+ */
+ @Override
+ public Boolean call( final MarkedEdge markedEdge ) {
+ return UUIDComparator.staticCompare( markedEdge.getVersion(), maxVersion ) < 1;
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/078ca7dc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairImpl.java
index 548c0e9..e8420a8 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairImpl.java
@@ -20,24 +20,18 @@
package org.apache.usergrid.persistence.graph.impl.stage;
-import java.util.Iterator;
-import java.util.List;
import java.util.UUID;
import org.apache.usergrid.persistence.collection.OrganizationScope;
import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.graph.GraphFig;
import org.apache.usergrid.persistence.graph.MarkedEdge;
-import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
-import org.apache.usergrid.persistence.graph.serialization.impl.parse.ObservableIterator;
import com.fasterxml.uuid.UUIDComparator;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.netflix.astyanax.Keyspace;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import rx.Observable;
import rx.Scheduler;
@@ -46,41 +40,28 @@ import rx.functions.Func1;
/**
* SimpleRepair operation
- *
*/
@Singleton
-public class EdgeWriteRepairImpl implements EdgeWriteRepair {
-
- protected final EdgeSerialization edgeSerialization;
- protected final GraphFig graphFig;
- protected final Keyspace keyspace;
- protected final Scheduler scheduler;
+public class EdgeWriteRepairImpl extends AbstractEdgeRepair implements EdgeWriteRepair {
@Inject
public EdgeWriteRepairImpl( final EdgeSerialization edgeSerialization, final GraphFig graphFig,
final Keyspace keyspace, final Scheduler scheduler ) {
- this.edgeSerialization = edgeSerialization;
- this.graphFig = graphFig;
- this.keyspace = keyspace;
- this.scheduler = scheduler;
+ super( edgeSerialization, graphFig, keyspace, scheduler );
}
@Override
public Observable<MarkedEdge> repair( final OrganizationScope scope, final Edge edge ) {
- final UUID maxVersion = edge.getVersion();
-
- //get source edges
- Observable<MarkedEdge> sourceEdges = getEdgeVersionsFromSource( scope, edge );
-
- //get target edges
- Observable<MarkedEdge> targetEdges = getEdgeVersionsToTarget( scope, edge );
+ return super.repair( scope, edge );
+ }
- //merge source and target then deal with the distinct values
- return Observable.merge( sourceEdges, targetEdges ).filter( new Func1<MarkedEdge, Boolean>() {
+ @Override
+ protected Func1<MarkedEdge, Boolean> getFilter( final UUID maxVersion ) {
+ return new Func1<MarkedEdge, Boolean>() {
/**
* We only want to return edges < this version so we remove them
* @param markedEdge
@@ -90,66 +71,6 @@ public class EdgeWriteRepairImpl implements EdgeWriteRepair {
public Boolean call( final MarkedEdge markedEdge ) {
return UUIDComparator.staticCompare( markedEdge.getVersion(), maxVersion ) < 0;
}
- //buffer the deletes and issue them in a single mutation
- } ).distinctUntilChanged().buffer( graphFig.getScanPageSize() )
- .flatMap( new Func1<List<MarkedEdge>, Observable<MarkedEdge>>() {
- @Override
- public Observable<MarkedEdge> call( final List<MarkedEdge> markedEdges ) {
- final MutationBatch batch = keyspace.prepareMutationBatch();
-
- for ( MarkedEdge edge : markedEdges ) {
- final MutationBatch delete = edgeSerialization.deleteEdge( scope, edge );
-
- batch.mergeShallow( delete );
- }
-
- try {
- batch.execute();
- }
- catch ( ConnectionException e ) {
- throw new RuntimeException( "Unable to issue write to cassandra", e );
- }
-
- return Observable.from( markedEdges ).subscribeOn( scheduler );
- }
- } );
- }
-
-
- /**
- * Get all edge versions <= the specified max from the source
- */
- private Observable<MarkedEdge> getEdgeVersionsFromSource( final OrganizationScope scope, final Edge edge ) {
-
- return Observable.create( new ObservableIterator<MarkedEdge>() {
- @Override
- protected Iterator<MarkedEdge> getIterator() {
-
- final SimpleSearchByEdge search =
- new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(),
- edge.getVersion(), null );
-
- return edgeSerialization.getEdgeFromSource( scope, search );
- }
- } ).subscribeOn( scheduler );
- }
-
-
- /**
- * Get all edge versions <= the specified max from the source
- */
- private Observable<MarkedEdge> getEdgeVersionsToTarget( final OrganizationScope scope, final Edge edge ) {
-
- return Observable.create( new ObservableIterator<MarkedEdge>() {
- @Override
- protected Iterator<MarkedEdge> getIterator() {
-
- final SimpleSearchByEdge search =
- new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(),
- edge.getVersion(), null );
-
- return edgeSerialization.getEdgeToTarget( scope, search );
- }
- } ).subscribeOn( scheduler );
+ };
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/078ca7dc/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairTest.java
new file mode 100644
index 0000000..e8eedc9
--- /dev/null
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.impl.stage;
+
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.jukito.JukitoRunner;
+import org.jukito.UseModules;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.collection.cassandra.CassandraRule;
+import org.apache.usergrid.persistence.collection.guice.MigrationManagerRule;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.guice.TestGraphModule;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
+import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import com.google.inject.Inject;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createEdge;
+import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createId;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+/**
+ *
+ *
+ */
+@RunWith( JukitoRunner.class )
+@UseModules( { TestGraphModule.class } )
+public class EdgeDeleteRepairTest {
+
+ @ClassRule
+ public static CassandraRule rule = new CassandraRule();
+
+
+ @Inject
+ @Rule
+ public MigrationManagerRule migrationManagerRule;
+
+
+ @Inject
+ protected EdgeSerialization edgeSerialization;
+
+ @Inject
+ protected EdgeDeleteRepair edgeDeleteRepair;
+
+
+ protected OrganizationScope scope;
+
+
+ @Before
+ public void setup() {
+ scope = mock( OrganizationScope.class );
+
+ Id orgId = mock( Id.class );
+
+ when( orgId.getType() ).thenReturn( "organization" );
+ when( orgId.getUuid() ).thenReturn( UUIDGenerator.newTimeUUID() );
+
+ when( scope.getOrganization() ).thenReturn( orgId );
+ }
+
+
+ /**
+ * Test repairing with no edges
+ */
+ @Test
+ public void noEdges() {
+ Edge edge = createEdge( "source", "test", "target" );
+
+ Iterator<MarkedEdge> edges = edgeDeleteRepair.repair( scope, edge ).toBlockingObservable().getIterator();
+
+ assertFalse( "No edges cleaned", edges.hasNext() );
+ }
+
+
+ /**
+ * Test repairing with no edges
+ * TODO: TN. There appears to be a race condition here with ordering. Not sure if this is intentional as part of the impl
+ * or if it's an issue
+ */
+ @Test
+ public void versionTest() throws ConnectionException {
+ final int size = 10;
+
+ final List<Edge> versions = new ArrayList<Edge>( size );
+
+ final Id sourceId = createId( "source" );
+ final Id targetId = createId( "target" );
+ final String edgeType = "edge";
+
+ for ( int i = 0; i < size; i++ ) {
+ final Edge edge = createEdge( sourceId, edgeType, targetId );
+
+ versions.add( edge );
+
+ edgeSerialization.writeEdge( scope, edge ).execute();
+
+ System.out.println(String.format("[%d] %s", i, edge));
+ }
+
+
+ int deleteIndex = size / 2;
+
+ Edge keep = versions.get( deleteIndex );
+
+ Iterable<MarkedEdge> edges = edgeDeleteRepair.repair( scope, keep ).toBlockingObservable().toIterable();
+
+
+ int index = 0;
+
+ for ( MarkedEdge edge : edges ) {
+
+ final Edge removed = versions.get( deleteIndex - index );
+
+ assertEquals( "Removed matches saved index", removed, edge );
+
+ index++;
+ }
+
+ //now verify we get all the versions we expect back
+ Iterator<MarkedEdge> iterator = edgeSerialization.getEdgeFromSource( scope,
+ new SimpleSearchByEdge( sourceId, edgeType, targetId, UUIDGenerator.newTimeUUID(), null ) );
+
+ index = 0;
+
+ for(MarkedEdge edge: new IterableWrapper<MarkedEdge>( iterator )){
+
+ final Edge saved = versions.get( size - index - 1);
+
+ assertEquals(saved, edge);
+
+ index++;
+ }
+
+ final int keptCount = size-deleteIndex;
+
+ assertEquals("Kept edge version was the minimum", keptCount, index+1);
+ }
+
+
+ private class IterableWrapper<T> implements Iterable<T> {
+ private final Iterator<T> sourceIterator;
+
+
+ private IterableWrapper( final Iterator<T> sourceIterator ) {
+ this.sourceIterator = sourceIterator;
+ }
+
+
+ @Override
+ public Iterator<T> iterator() {
+ return this.sourceIterator;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/078ca7dc/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairTest.java
index 352639e..f796c85 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairTest.java
@@ -109,6 +109,8 @@ public class EdgeWriteRepairTest {
/**
* Test repairing with no edges
+ * TODO: TN. There appears to be a race condition here with ordering. Not sure if this is intentional as part of the impl
+ * or if it's an issue
*/
@Test
public void versionTest() throws ConnectionException {