You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/03/14 23:21:03 UTC

[1/2] git commit: Temporary commit before adding delete repair

Repository: incubator-usergrid
Updated Branches:
  refs/heads/asyncqueue 78dc432d1 -> 078ca7dca


Temporary commit before adding delete repair


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/96e6f5fc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/96e6f5fc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/96e6f5fc

Branch: refs/heads/asyncqueue
Commit: 96e6f5fcb1ae332b9bbd333bc52b4aab613300f6
Parents: 78dc432
Author: Todd Nine <tn...@apigee.com>
Authored: Fri Mar 14 11:38:07 2014 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri Mar 14 11:38:07 2014 -0700

----------------------------------------------------------------------
 .../persistence/graph/guice/GraphModule.java    |  19 +-
 .../persistence/graph/impl/stage/EdgeAsync.java |  62 ---
 .../graph/impl/stage/EdgeAsyncImpl.java         | 261 -----------
 .../graph/impl/stage/EdgeDeleteRepair.java      |  41 ++
 .../graph/impl/stage/EdgeMetaRepair.java        |  62 +++
 .../graph/impl/stage/EdgeMetaRepairImpl.java    | 331 ++++++++++++++
 .../graph/impl/stage/EdgeWriteRepair.java       |  41 ++
 .../graph/impl/stage/EdgeWriteRepairImpl.java   | 155 +++++++
 .../graph/impl/stage/EdgeAsyncTest.java         | 287 ------------
 .../graph/impl/stage/EdgeMetaRepairTest.java    | 451 +++++++++++++++++++
 .../graph/impl/stage/EdgeWriteRepairTest.java   | 185 ++++++++
 11 files changed, 1274 insertions(+), 621 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/96e6f5fc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
index a18e741..c81dc82 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
@@ -34,8 +34,10 @@ import org.apache.usergrid.persistence.graph.consistency.LocalTimeoutQueue;
 import org.apache.usergrid.persistence.graph.consistency.TimeoutQueue;
 import org.apache.usergrid.persistence.graph.impl.CollectionIndexObserver;
 import org.apache.usergrid.persistence.graph.impl.EdgeManagerImpl;
-import org.apache.usergrid.persistence.graph.impl.stage.EdgeAsync;
-import org.apache.usergrid.persistence.graph.impl.stage.EdgeAsyncImpl;
+import org.apache.usergrid.persistence.graph.impl.stage.EdgeMetaRepair;
+import org.apache.usergrid.persistence.graph.impl.stage.EdgeMetaRepairImpl;
+import org.apache.usergrid.persistence.graph.impl.stage.EdgeWriteRepair;
+import org.apache.usergrid.persistence.graph.impl.stage.EdgeWriteRepairImpl;
 import org.apache.usergrid.persistence.graph.serialization.CassandraConfig;
 import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
 import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
@@ -45,16 +47,9 @@ import org.apache.usergrid.persistence.graph.serialization.impl.EdgeMetadataSeri
 import org.apache.usergrid.persistence.graph.serialization.impl.EdgeSerializationImpl;
 import org.apache.usergrid.persistence.graph.serialization.impl.NodeSerializationImpl;
 
-import com.google.common.eventbus.EventBus;
 import com.google.inject.AbstractModule;
-import com.google.inject.Provides;
-import com.google.inject.TypeLiteral;
 import com.google.inject.assistedinject.FactoryModuleBuilder;
-import com.google.inject.matcher.Matchers;
 import com.google.inject.multibindings.Multibinder;
-import com.google.inject.spi.InjectionListener;
-import com.google.inject.spi.TypeEncounter;
-import com.google.inject.spi.TypeListener;
 
 
 /**
@@ -108,8 +103,10 @@ public class GraphModule extends AbstractModule {
         bind(AsyncProcessor.class).annotatedWith( EdgeWrite.class ).to( AsyncProcessorImpl.class );
         bind(AsyncProcessor.class).annotatedWith( NodeDelete.class ).to( AsyncProcessorImpl.class );
 
-        //Edge stages
-        bind( EdgeAsync.class).to( EdgeAsyncImpl.class );
+        //Repair/cleanup classes
+        bind( EdgeMetaRepair.class).to( EdgeMetaRepairImpl.class );
+
+        bind( EdgeWriteRepair.class).to( EdgeWriteRepairImpl.class );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/96e6f5fc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsync.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsync.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsync.java
deleted file mode 100644
index 90c1bfe..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsync.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.graph.impl.stage;
-
-
-import java.util.UUID;
-
-import org.apache.usergrid.persistence.collection.OrganizationScope;
-import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.netflix.astyanax.MutationBatch;
-
-import rx.Observable;
-
-
-/**
- * Creates a mutation which will remove obsolete
- *
- */
-public interface EdgeAsync {
-
-    /**
-     * Validate that the source types can be cleaned for the given info
-     * @param scope The scope to use
-     * @param sourceId The source Id to use
-     * @param edgeType The edge type
-     * @param version The max version to clean
-     * @return The mutation with the operations
-     */
-    public Observable<Integer> cleanSources(OrganizationScope scope, Id sourceId, String edgeType, UUID version);
-
-
-    /**
-     *
-     * Remove all source id types that are empty, as well as the edge type if there are no more edges for it
-     * @param scope The scope to use
-     * @param targetId The target Id to use
-     * @param edgeType The edge type
-     * @param version The max version to clean
-     * @return  The mutation with the operations
-     */
-    public Observable<Integer> clearTargets( OrganizationScope scope, Id targetId, String edgeType, UUID version );
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/96e6f5fc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncImpl.java
deleted file mode 100644
index ba21bf0..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncImpl.java
+++ /dev/null
@@ -1,261 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.graph.impl.stage;
-
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.UUID;
-
-import org.apache.usergrid.persistence.collection.OrganizationScope;
-import org.apache.usergrid.persistence.collection.mvcc.entity.ValidationUtils;
-import org.apache.usergrid.persistence.graph.GraphFig;
-import org.apache.usergrid.persistence.graph.MarkedEdge;
-import org.apache.usergrid.persistence.graph.SearchByEdgeType;
-import org.apache.usergrid.persistence.graph.SearchByIdType;
-import org.apache.usergrid.persistence.graph.SearchEdgeType;
-import org.apache.usergrid.persistence.graph.SearchIdType;
-import org.apache.usergrid.persistence.graph.impl.SimpleSearchByIdType;
-import org.apache.usergrid.persistence.graph.impl.SimpleSearchIdType;
-import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
-import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
-import org.apache.usergrid.persistence.graph.serialization.impl.parse.ObservableIterator;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.common.base.Preconditions;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import com.netflix.astyanax.Keyspace;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-
-import rx.Observable;
-import rx.Scheduler;
-import rx.functions.Action1;
-import rx.functions.Func1;
-import rx.observables.MathObservable;
-
-
-/**
- * Implementation of the cleanup of edge meta data
- */
-@Singleton
-public class EdgeAsyncImpl implements EdgeAsync {
-
-
-    private final EdgeMetadataSerialization edgeMetadataSerialization;
-    private final EdgeSerialization edgeSerialization;
-    private final Keyspace keyspace;
-    private final GraphFig graphFig;
-    private final Scheduler scheduler;
-
-
-    @Inject
-    public EdgeAsyncImpl( final EdgeMetadataSerialization edgeMetadataSerialization,
-                          final EdgeSerialization edgeSerialization, final Keyspace keyspace, final GraphFig graphFig,
-                          final Scheduler scheduler ) {
-        this.edgeMetadataSerialization = edgeMetadataSerialization;
-        this.edgeSerialization = edgeSerialization;
-        this.keyspace = keyspace;
-        this.graphFig = graphFig;
-        this.scheduler = scheduler;
-    }
-
-
-    @Override
-    public Observable<Integer> cleanSources( final OrganizationScope scope, final Id sourceId, final String edgeType,
-                                             final UUID version ) {
-
-
-        return null;  //To change body of implemented methods use File | Settings | File Templates.
-    }
-
-
-    @Override
-    public Observable<Integer> clearTargets( final OrganizationScope scope, final Id targetId, final String edgeType,
-                                             final UUID version ) {
-
-        ValidationUtils.validateOrganizationScope( scope );
-        ValidationUtils.verifyIdentity( targetId );
-        Preconditions.checkNotNull( edgeType, "edge type is required" );
-        Preconditions.checkNotNull( version, "version is required" );
-
-
-        Observable<Integer> deleteCounts =
-                loadEdgeIdsToTarget( scope, new SimpleSearchIdType( targetId, edgeType, null ) )
-                        .buffer( graphFig.getRepairConcurrentSize() )
-                                //buffer them into concurrent groups based on the concurrent repair size
-                        .flatMap( new Func1<List<String>, Observable<Integer>>() {
-
-                            @Override
-                            public Observable<Integer> call( final List<String> types ) {
-
-
-                                final MutationBatch batch = keyspace.prepareMutationBatch();
-
-                                final List<Observable<Integer>> checks =
-                                        new ArrayList<Observable<Integer>>( types.size() );
-
-                                //for each id type, check if the exist in parallel to increase processing speed
-                                for ( final String sourceIdType : types ) {
-
-                                    final SearchByIdType searchData =
-                                            new SimpleSearchByIdType( targetId, edgeType, version, sourceIdType, null );
-
-                                    Observable<Integer> search = getEdgesToTargetBySourceType( scope, searchData )
-                                            .distinctUntilChanged( new Func1<MarkedEdge, Id>() {
-
-                                                //get distinct by source node
-                                                @Override
-                                                public Id call( final MarkedEdge markedEdge ) {
-                                                    return markedEdge.getSourceNode();
-                                                }
-                                            } ).take( 1 ).count().doOnNext( new Action1<Integer>() {
-
-                                                @Override
-                                                public void call( final Integer count ) {
-                                                    /**
-                                                     * we only want to delete if no edges are in this class. If there
-                                                     * are
-                                                     * still edges
-                                                     * we must retain the information in order to keep our index
-                                                     * structure
-                                                     * correct for edge
-                                                     * iteration
-                                                     **/
-                                                    if ( count != 0 ) {
-                                                        return;
-                                                    }
-
-
-                                                    batch.mergeShallow( edgeMetadataSerialization
-                                                            .removeIdTypeToTarget( scope, targetId, edgeType,
-                                                                    sourceIdType, version ) );
-                                                }
-                                            } );
-
-                                    checks.add( search );
-                                }
-
-
-                                /**
-                                 * Sum up the total number of edges we had, then execute the mutation if we have
-                                 * anything to do
-                                 */
-                                return MathObservable.sumInteger( Observable.merge( checks ) )
-                                                     .doOnNext( new Action1<Integer>() {
-                                                         @Override
-                                                         public void call( final Integer count ) {
-
-                                                             if ( batch.isEmpty() ) {
-                                                                 return;
-                                                             }
-
-                                                             try {
-                                                                 batch.execute();
-                                                             }
-                                                             catch ( ConnectionException e ) {
-                                                                 throw new RuntimeException(
-                                                                         "Unable to execute mutation", e );
-                                                             }
-                                                         }
-                                                     } );
-                            }
-                        } )
-                                //if we get no edges, emit a 0 so the caller knows nothing was deleted
-                        .defaultIfEmpty( 0 );
-
-
-        //sum up everything emitted by sub types.  If there's no edges existing for all sub types,
-        // then we can safely remove them
-        return MathObservable.sumInteger( deleteCounts ).map( new Func1<Integer, Integer>() {
-            @Override
-            public Integer call( final Integer subTypes ) {
-
-                /**
-                 * We can only execute deleting this type if no sub types were deleted
-                 */
-                if ( subTypes != 0 ) {
-                    return subTypes;
-                }
-
-                try {
-                    edgeMetadataSerialization.removeEdgeTypeToTarget( scope, targetId, edgeType, version ).execute();
-                }
-                catch ( ConnectionException e ) {
-                    throw new RuntimeException( "Unable to execute mutation" );
-                }
-
-                return subTypes;
-            }
-        } );
-    }
-
-
-    /**
-     * Get all existing edge types to the target node
-     */
-    private Observable<String> getEdgesTypesToTarget( final OrganizationScope scope, final SearchEdgeType search ) {
-
-        return Observable.create( new ObservableIterator<String>() {
-            @Override
-            protected Iterator<String> getIterator() {
-                return edgeMetadataSerialization.getEdgeTypesToTarget( scope, search );
-            }
-        } ).subscribeOn( scheduler );
-    }
-
-
-    private Observable<MarkedEdge> getEdgesToTargetBySourceType( final OrganizationScope scope,
-                                                                 final SearchByIdType search ) {
-
-        return Observable.create( new ObservableIterator<MarkedEdge>() {
-            @Override
-            protected Iterator<MarkedEdge> getIterator() {
-                return edgeSerialization.getEdgesToTargetBySourceType( scope, search );
-            }
-        } ).subscribeOn( scheduler );
-    }
-
-
-    private Observable<String> loadEdgeIdsToTarget( final OrganizationScope scope, final SearchIdType search ) {
-        return Observable.create( new ObservableIterator<String>() {
-            @Override
-            protected Iterator<String> getIterator() {
-                return edgeMetadataSerialization.getIdTypesToTarget( scope, search );
-            }
-        } ).subscribeOn( scheduler );
-    }
-
-
-    /**
-     * Load all edges pointing to this target
-     */
-    private Observable<MarkedEdge> loadEdgesToTarget( final OrganizationScope scope, final SearchByEdgeType search ) {
-
-        return Observable.create( new ObservableIterator<MarkedEdge>() {
-            @Override
-            protected Iterator<MarkedEdge> getIterator() {
-                return edgeSerialization.getEdgesToTarget( scope, search );
-            }
-        } ).subscribeOn( scheduler );
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/96e6f5fc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepair.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepair.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepair.java
new file mode 100644
index 0000000..487472c
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepair.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.impl.stage;
+
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+
+import rx.Observable;
+
+
+/**
+ * Interface to perform repair operations on an edge when it is written
+ */
+public interface EdgeDeleteRepair {
+
+    /**
+     * Repair this edge.  Remove previous entries including this one
+     * @param scope
+     * @param edge
+     */
+    public Observable<MarkedEdge> repair( OrganizationScope scope, Edge edge );
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/96e6f5fc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepair.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepair.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepair.java
new file mode 100644
index 0000000..31207d8
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepair.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.impl.stage;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import rx.Observable;
+
+
+/**
+ * Audits edge meta data and removes them if they're obscelete
+ */
+public interface EdgeMetaRepair {
+
+    /**
+     * Validate that the source types can be cleaned for the given info
+     *
+     * @param scope The scope to use
+     * @param sourceId The source Id to use
+     * @param edgeType The edge type
+     * @param version The max version to clean
+     *
+     * @return An observable that emits the total number of sub types still in use.  0 implies the type and subtypes
+     *         have been removed.  Anything > 0 implies the edgeType and subTypes are still in use
+     */
+    public Observable<Integer> repairSources( OrganizationScope scope, Id sourceId, String edgeType, UUID version );
+
+
+    /**
+     * Remove all source id types that are empty, as well as the edge type if there are no more edges for it
+     *
+     * @param scope The scope to use
+     * @param targetId The target Id to use
+     * @param edgeType The edge type
+     * @param version The max version to clean
+     *
+     * @return An observable that emits the total number of sub types still in use.  0 implies the type and subtypes
+     *         have been removed.  Anything > 0 implies the edgeType and subTypes are still in use
+     */
+    public Observable<Integer> repairTargets( OrganizationScope scope, Id targetId, String edgeType, UUID version );
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/96e6f5fc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
new file mode 100644
index 0000000..51777ab
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.impl.stage;
+
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.collection.mvcc.entity.ValidationUtils;
+import org.apache.usergrid.persistence.graph.GraphFig;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByIdType;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchIdType;
+import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
+import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
+import org.apache.usergrid.persistence.graph.serialization.impl.parse.ObservableIterator;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import rx.Observable;
+import rx.Scheduler;
+import rx.functions.Action1;
+import rx.functions.Func1;
+import rx.observables.MathObservable;
+
+
+/**
+ * Implementation of the cleanup of edge meta data
+ */
+@Singleton
+public class EdgeMetaRepairImpl implements EdgeMetaRepair {
+
+
+    private final EdgeMetadataSerialization edgeMetadataSerialization;
+    private final EdgeSerialization edgeSerialization;
+    private final Keyspace keyspace;
+    private final GraphFig graphFig;
+    private final Scheduler scheduler;
+
+
+    @Inject
+    public EdgeMetaRepairImpl( final EdgeMetadataSerialization edgeMetadataSerialization,
+                               final EdgeSerialization edgeSerialization, final Keyspace keyspace,
+                               final GraphFig graphFig, final Scheduler scheduler ) {
+        this.edgeMetadataSerialization = edgeMetadataSerialization;
+        this.edgeSerialization = edgeSerialization;
+        this.keyspace = keyspace;
+        this.graphFig = graphFig;
+        this.scheduler = scheduler;
+    }
+
+
+    @Override
+    public Observable<Integer> repairSources( final OrganizationScope scope, final Id sourceId, final String edgeType,
+                                              final UUID version ) {
+
+
+        return clearTypes( scope, sourceId, edgeType, version, source );
+    }
+
+
+    @Override
+    public Observable<Integer> repairTargets( final OrganizationScope scope, final Id targetId, final String edgeType,
+                                              final UUID version ) {
+        return clearTypes( scope, targetId, edgeType, version, target );
+    }
+
+
+    private Observable<Integer> clearTypes( final OrganizationScope scope, final Id node, final String edgeType,
+                                            final UUID version, final CleanSerialization serialization ) {
+
+        ValidationUtils.validateOrganizationScope( scope );
+        ValidationUtils.verifyIdentity( node );
+        Preconditions.checkNotNull( edgeType, "edge type is required" );
+        Preconditions.checkNotNull( version, "version is required" );
+
+
+        Observable<Integer> deleteCounts = serialization.loadEdgeSubTypes( scope, node, edgeType, version )
+                .buffer( graphFig.getRepairConcurrentSize() )
+                        //buffer them into concurrent groups based on the concurrent repair size
+                .flatMap( new Func1<List<String>, Observable<Integer>>() {
+
+                    @Override
+                    public Observable<Integer> call( final List<String> types ) {
+
+
+                        final MutationBatch batch = keyspace.prepareMutationBatch();
+
+                        final List<Observable<Integer>> checks = new ArrayList<Observable<Integer>>( types.size() );
+
+                        //for each id type, check if the exist in parallel to increase processing speed
+                        for ( final String sourceIdType : types ) {
+
+                            Observable<Integer> search =
+                                    serialization.loadEdges( scope, node, edgeType, sourceIdType, version )
+                                                 .distinctUntilChanged( new Func1<MarkedEdge, Id>() {
+
+                                                     //get distinct by source node
+                                                     @Override
+                                                     public Id call( final MarkedEdge markedEdge ) {
+                                                         return markedEdge.getSourceNode();
+                                                     }
+                                                 } ).take( 1 ).count().doOnNext( new Action1<Integer>() {
+
+                                        @Override
+                                        public void call( final Integer count ) {
+                                            /**
+                                             * we only want to delete if no edges are in this class. If there
+                                             * are
+                                             * still edges
+                                             * we must retain the information in order to keep our index
+                                             * structure
+                                             * correct for edge
+                                             * iteration
+                                             **/
+                                            if ( count != 0 ) {
+                                                return;
+                                            }
+
+                                            batch.mergeShallow( serialization
+                                                    .removeEdgeSubType( scope, node, edgeType, sourceIdType,
+                                                            version ) );
+                                        }
+                                    } );
+
+                            checks.add( search );
+                        }
+
+
+                        /**
+                         * Sum up the total number of edges we had, then execute the mutation if we have
+                         * anything to do
+                         */
+                        return MathObservable.sumInteger( Observable.merge( checks ) )
+                                             .doOnNext( new Action1<Integer>() {
+                                                 @Override
+                                                 public void call( final Integer count ) {
+
+                                                     if ( batch.isEmpty() ) {
+                                                         return;
+                                                     }
+
+                                                     try {
+                                                         batch.execute();
+                                                     }
+                                                     catch ( ConnectionException e ) {
+                                                         throw new RuntimeException( "Unable to execute mutation", e );
+                                                     }
+                                                 }
+                                             } );
+                    }
+                } )
+                        //if we get no edges, emit a 0 so the caller knows we can delete the type
+                .defaultIfEmpty( 0 );
+
+
+        //sum up everything emitted by sub types.  If there's no edges existing for all sub types,
+        // then we can safely remove them
+        return MathObservable.sumInteger( deleteCounts ).last().doOnNext( new Action1<Integer>() {
+
+            @Override
+            public void call( final Integer subTypes ) {
+
+                /**
+                 * We can only execute deleting this type if no sub types were deleted
+                 */
+                if ( subTypes != 0 ) {
+                    return;
+                }
+
+                try {
+
+                    serialization.removeEdgeType( scope, node, edgeType, version ).execute();
+                }
+                catch ( ConnectionException e ) {
+                    throw new RuntimeException( "Unable to execute mutation" );
+                }
+
+            }
+        } );
+    }
+
+
+    /**
+     * Simple edge serialization
+     */
+    private static interface CleanSerialization {
+
+        /**
+         * Load all subtypes for the edge with a version <= the provided version
+         */
+        Observable<String> loadEdgeSubTypes( final OrganizationScope scope, final Id nodeId, final String type,
+                                             final UUID version );
+
+
+        /**
+         * Load an observable with edges from the details provided
+         */
+        Observable<MarkedEdge> loadEdges( final OrganizationScope scope, final Id nodeId, final String edgeType,
+                                          final String subType, final UUID version );
+
+        /**
+         * Remove the sub type specified
+         */
+        MutationBatch removeEdgeSubType( final OrganizationScope scope, final Id nodeId, final String edgeType,
+                                         final String subType, final UUID version );
+
+        /**
+         * Remove the edge type
+         */
+        MutationBatch removeEdgeType( final OrganizationScope scope, final Id nodeId, final String type,
+                                      final UUID version );
+    }
+
+
+    /**
+     * Target serialization i/o for cleaning target edges
+     */
+    private final CleanSerialization target = new CleanSerialization() {
+
+        @Override
+        public Observable<String> loadEdgeSubTypes( final OrganizationScope scope, final Id nodeId,
+                                                    final String edgeType, final UUID version ) {
+            return Observable.create( new ObservableIterator<String>() {
+                @Override
+                protected Iterator<String> getIterator() {
+                    return edgeMetadataSerialization
+                            .getIdTypesToTarget( scope, new SimpleSearchIdType( nodeId, edgeType, null ) );
+                }
+            } ).subscribeOn( scheduler );
+        }
+
+
+        @Override
+        public Observable<MarkedEdge> loadEdges( final OrganizationScope scope, final Id nodeId, final String edgeType,
+                                                 final String subType, final UUID version ) {
+            return Observable.create( new ObservableIterator<MarkedEdge>() {
+                @Override
+                protected Iterator<MarkedEdge> getIterator() {
+                    return edgeSerialization.getEdgesToTargetBySourceType( scope,
+                            new SimpleSearchByIdType( nodeId, edgeType, version, subType, null ) );
+                }
+            } ).subscribeOn( scheduler );
+        }
+
+
+        @Override
+        public MutationBatch removeEdgeSubType( final OrganizationScope scope, final Id nodeId, final String type,
+                                                final String subType, final UUID version ) {
+            return edgeMetadataSerialization.removeIdTypeToTarget( scope, nodeId, type, subType, version );
+        }
+
+
+        @Override
+        public MutationBatch removeEdgeType( final OrganizationScope scope, final Id nodeId, final String type,
+                                             final UUID version ) {
+            return edgeMetadataSerialization.removeEdgeTypeToTarget( scope, nodeId, type, version );
+        }
+    };
+
+    /**
+     * Target serialization i/o for cleaning target edges
+     */
+    private final CleanSerialization source = new CleanSerialization() {
+
+        @Override
+        public Observable<String> loadEdgeSubTypes( final OrganizationScope scope, final Id nodeId,
+                                                    final String edgeType, final UUID version ) {
+            return Observable.create( new ObservableIterator<String>() {
+                @Override
+                protected Iterator<String> getIterator() {
+                    return edgeMetadataSerialization
+                            .getIdTypesFromSource( scope, new SimpleSearchIdType( nodeId, edgeType, null ) );
+                }
+            } ).subscribeOn( scheduler );
+        }
+
+
+        @Override
+        public Observable<MarkedEdge> loadEdges( final OrganizationScope scope, final Id nodeId, final String edgeType,
+                                                 final String subType, final UUID version ) {
+            return Observable.create( new ObservableIterator<MarkedEdge>() {
+                @Override
+                protected Iterator<MarkedEdge> getIterator() {
+                    return edgeSerialization.getEdgesFromSourceByTargetType( scope,
+                            new SimpleSearchByIdType( nodeId, edgeType, version, subType, null ) );
+                }
+            } ).subscribeOn( scheduler );
+        }
+
+
+        @Override
+        public MutationBatch removeEdgeSubType( final OrganizationScope scope, final Id nodeId, final String type,
+                                                final String subType, final UUID version ) {
+            return edgeMetadataSerialization.removeIdTypeFromSource( scope, nodeId, type, subType, version );
+        }
+
+
+        @Override
+        public MutationBatch removeEdgeType( final OrganizationScope scope, final Id nodeId, final String type,
+                                             final UUID version ) {
+            return edgeMetadataSerialization.removeEdgeTypeFromSource( scope, nodeId, type, version );
+        }
+    };
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/96e6f5fc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepair.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepair.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepair.java
new file mode 100644
index 0000000..550ddf1
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepair.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.impl.stage;
+
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+
+import rx.Observable;
+
+
+/**
+ * Interface to perform repair operations on an edge when it is written
+ */
+public interface EdgeWriteRepair {
+
+    /**
+     * Repair this edge.  Remove previous entries
+     * @param scope
+     * @param edge
+     */
+    public Observable<MarkedEdge> repair( OrganizationScope scope, Edge edge );
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/96e6f5fc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairImpl.java
new file mode 100644
index 0000000..548c0e9
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairImpl.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.impl.stage;
+
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphFig;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
+import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
+import org.apache.usergrid.persistence.graph.serialization.impl.parse.ObservableIterator;
+
+import com.fasterxml.uuid.UUIDComparator;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import rx.Observable;
+import rx.Scheduler;
+import rx.functions.Func1;
+
+
+/**
+ * SimpleRepair operation
+ *
+ */
+@Singleton
+public class EdgeWriteRepairImpl implements EdgeWriteRepair {
+
+    protected final EdgeSerialization edgeSerialization;
+    protected final GraphFig graphFig;
+    protected final Keyspace keyspace;
+    protected final Scheduler scheduler;
+
+
+    @Inject
+    public EdgeWriteRepairImpl( final EdgeSerialization edgeSerialization, final GraphFig graphFig,
+                                final Keyspace keyspace, final Scheduler scheduler ) {
+        this.edgeSerialization = edgeSerialization;
+        this.graphFig = graphFig;
+        this.keyspace = keyspace;
+        this.scheduler = scheduler;
+    }
+
+
+    @Override
+    public Observable<MarkedEdge> repair( final OrganizationScope scope, final Edge edge ) {
+
+        final UUID maxVersion = edge.getVersion();
+
+        //get source edges
+        Observable<MarkedEdge> sourceEdges = getEdgeVersionsFromSource( scope, edge );
+
+        //get target edges
+        Observable<MarkedEdge> targetEdges = getEdgeVersionsToTarget( scope, edge );
+
+
+        //merge source and target then deal with the distinct values
+        return Observable.merge( sourceEdges, targetEdges ).filter( new Func1<MarkedEdge, Boolean>() {
+            /**
+             * We only want to return edges < this version so we remove them
+             * @param markedEdge
+             * @return
+             */
+            @Override
+            public Boolean call( final MarkedEdge markedEdge ) {
+                return UUIDComparator.staticCompare( markedEdge.getVersion(), maxVersion ) < 0;
+            }
+            //buffer the deletes and issue them in a single mutation
+        } ).distinctUntilChanged().buffer( graphFig.getScanPageSize() )
+                         .flatMap( new Func1<List<MarkedEdge>, Observable<MarkedEdge>>() {
+                             @Override
+                             public Observable<MarkedEdge> call( final List<MarkedEdge> markedEdges ) {
+                                 final MutationBatch batch = keyspace.prepareMutationBatch();
+
+                                 for ( MarkedEdge edge : markedEdges ) {
+                                     final MutationBatch delete = edgeSerialization.deleteEdge( scope, edge );
+
+                                     batch.mergeShallow( delete );
+                                 }
+
+                                 try {
+                                     batch.execute();
+                                 }
+                                 catch ( ConnectionException e ) {
+                                     throw new RuntimeException( "Unable to issue write to cassandra", e );
+                                 }
+
+                                 return Observable.from( markedEdges ).subscribeOn( scheduler );
+                             }
+             } );
+    }
+
+
+    /**
+     * Get all edge versions <= the specified max from the source
+     */
+    private Observable<MarkedEdge> getEdgeVersionsFromSource( final OrganizationScope scope, final Edge edge ) {
+
+        return Observable.create( new ObservableIterator<MarkedEdge>() {
+            @Override
+            protected Iterator<MarkedEdge> getIterator() {
+
+                final SimpleSearchByEdge search =
+                        new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(),
+                                edge.getVersion(), null );
+
+                return edgeSerialization.getEdgeFromSource( scope, search );
+            }
+        } ).subscribeOn( scheduler );
+    }
+
+
+    /**
+     * Get all edge versions <= the specified max from the source
+     */
+    private Observable<MarkedEdge> getEdgeVersionsToTarget( final OrganizationScope scope, final Edge edge ) {
+
+        return Observable.create( new ObservableIterator<MarkedEdge>() {
+            @Override
+            protected Iterator<MarkedEdge> getIterator() {
+
+                final SimpleSearchByEdge search =
+                        new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(),
+                                edge.getVersion(), null );
+
+                return edgeSerialization.getEdgeToTarget( scope, search );
+            }
+        } ).subscribeOn( scheduler );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/96e6f5fc/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncTest.java
deleted file mode 100644
index 5f57e64..0000000
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncTest.java
+++ /dev/null
@@ -1,287 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.graph.impl.stage;
-
-
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Set;
-import java.util.UUID;
-
-import org.jukito.JukitoRunner;
-import org.jukito.UseModules;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-
-import org.apache.usergrid.persistence.collection.OrganizationScope;
-import org.apache.usergrid.persistence.collection.cassandra.CassandraRule;
-import org.apache.usergrid.persistence.collection.guice.MigrationManagerRule;
-import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.graph.GraphFig;
-import org.apache.usergrid.persistence.graph.guice.TestGraphModule;
-import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
-import org.apache.usergrid.persistence.graph.impl.SimpleSearchIdType;
-import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
-import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
-import org.apache.usergrid.persistence.graph.serialization.NodeSerialization;
-import org.apache.usergrid.persistence.model.entity.Id;
-import org.apache.usergrid.persistence.model.util.UUIDGenerator;
-
-import com.google.inject.Inject;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-
-import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createEdge;
-import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createId;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-/**
- *
- *
- */
-@RunWith( JukitoRunner.class )
-@UseModules( { TestGraphModule.class } )
-public class EdgeAsyncTest {
-
-
-    @ClassRule
-    public static CassandraRule rule = new CassandraRule();
-
-
-    @Inject
-    @Rule
-    public MigrationManagerRule migrationManagerRule;
-
-
-    @Inject
-    protected NodeSerialization serialization;
-
-    @Inject
-    protected EdgeAsync edgeAsync;
-
-    @Inject
-    protected EdgeSerialization edgeSerialization;
-
-    @Inject
-    protected EdgeMetadataSerialization edgeMetadataSerialization;
-
-    @Inject
-    protected GraphFig graphFig;
-
-    protected OrganizationScope scope;
-
-
-    @Before
-    public void setup() {
-        scope = mock( OrganizationScope.class );
-
-        Id orgId = mock( Id.class );
-
-        when( orgId.getType() ).thenReturn( "organization" );
-        when( orgId.getUuid() ).thenReturn( UUIDGenerator.newTimeUUID() );
-
-        when( scope.getOrganization() ).thenReturn( orgId );
-    }
-
-
-    @Test
-    public void cleanTargetNoEdgesNoMeta() {
-        //do no writes, then execute a cleanup with no meta data
-
-        final Id targetId = createId( "target" );
-        final String test = "test";
-        final UUID version = UUIDGenerator.newTimeUUID();
-
-        int value = edgeAsync.clearTargets( scope, targetId, test, version ).toBlockingObservable().single();
-
-        assertEquals( "No subtypes found", 0, value );
-    }
-
-
-    @Test
-    public void cleanTargetSingleEdge() throws ConnectionException {
-        Edge edge = createEdge( "source", "test", "target" );
-
-        edgeSerialization.writeEdge( scope, edge ).execute();
-
-        edgeMetadataSerialization.writeEdge( scope, edge ).execute();
-
-        int value = edgeAsync.clearTargets( scope, edge.getTargetNode(), edge.getType(), edge.getVersion() )
-                             .toBlockingObservable().single();
-
-        assertEquals( "No subtypes removed, edge exists", 1, value );
-
-        //now delete the edge
-
-        edgeSerialization.deleteEdge( scope, edge ).execute();
-
-        value = edgeAsync.clearTargets( scope, edge.getTargetNode(), edge.getType(), edge.getVersion() )
-                         .toBlockingObservable().single();
-
-        assertEquals( "Single subtype should be removed", 0, value );
-
-        //now verify they're gone
-
-        Iterator<String> edgeTypes = edgeMetadataSerialization
-                .getEdgeTypesToTarget( scope, new SimpleSearchEdgeType( edge.getTargetNode(), null ) );
-
-        assertFalse( "No edge types exist", edgeTypes.hasNext() );
-
-
-        Iterator<String> sourceTypes = edgeMetadataSerialization
-                .getIdTypesToTarget( scope, new SimpleSearchIdType( edge.getTargetNode(), edge.getType(), null ) );
-
-        assertFalse( "No edge types exist", sourceTypes.hasNext() );
-    }
-
-
-    @Test
-    public void cleanTargetMultipleEdge() throws ConnectionException {
-
-        Id targetId = createId( "target" );
-
-        Edge edge1 = createEdge( createId("source1"), "test", targetId);
-
-
-
-        edgeSerialization.writeEdge( scope, edge1 ).execute();
-
-        edgeMetadataSerialization.writeEdge( scope, edge1 ).execute();
-
-        Edge edge2 = createEdge( createId("source2"), "test", targetId );
-
-        edgeSerialization.writeEdge( scope, edge2 ).execute();
-
-        edgeMetadataSerialization.writeEdge( scope, edge2 ).execute();
-
-        Edge edge3 = createEdge( createId("source3"), "test", targetId );
-
-        edgeSerialization.writeEdge( scope, edge3 ).execute();
-
-        edgeMetadataSerialization.writeEdge( scope, edge3 ).execute();
-
-
-        UUID cleanupVersion = UUIDGenerator.newTimeUUID();
-
-        int value = edgeAsync.clearTargets( scope, edge1.getTargetNode(), edge1.getType(), cleanupVersion )
-                             .toBlockingObservable().single();
-
-        assertEquals( "No subtypes removed, edges exist", 3, value );
-
-        //now delete the edge
-
-        edgeSerialization.deleteEdge( scope, edge1 ).execute();
-
-        value = edgeAsync.clearTargets( scope, edge1.getTargetNode(), edge1.getType(), cleanupVersion )
-                         .toBlockingObservable().single();
-
-        assertEquals( "No subtypes removed, edges exist", 2, value );
-
-        edgeSerialization.deleteEdge( scope, edge2 ).execute();
-
-        value = edgeAsync.clearTargets( scope, edge1.getTargetNode(), edge1.getType(), cleanupVersion )
-                         .toBlockingObservable().single();
-
-        assertEquals( "No subtypes removed, edges exist", 1, value );
-
-        edgeSerialization.deleteEdge( scope, edge3 ).execute();
-
-        value = edgeAsync.clearTargets( scope, edge1.getTargetNode(), edge1.getType(), cleanupVersion )
-                         .toBlockingObservable().single();
-
-
-        assertEquals( "Single subtype should be removed", 0, value );
-
-        //now verify they're gone
-
-        Iterator<String> edgeTypes = edgeMetadataSerialization
-                .getEdgeTypesToTarget( scope, new SimpleSearchEdgeType( edge1.getTargetNode(), null ) );
-
-        assertFalse( "No edge types exist", edgeTypes.hasNext() );
-
-
-        Iterator<String> sourceTypes = edgeMetadataSerialization
-                .getIdTypesToTarget( scope, new SimpleSearchIdType( edge1.getTargetNode(), edge1.getType(), null ) );
-
-        assertFalse( "No edge types exist", sourceTypes.hasNext() );
-    }
-
-
-    @Test
-    public void cleanTargetMultipleEdgeBuffer() throws ConnectionException {
-
-        final Id targetId = createId( "target" );
-        final String edgeType = "test";
-
-        final int size =  graphFig.getRepairConcurrentSize()*2;
-
-        Set<Edge> writtenEdges = new HashSet<Edge>();
-
-
-        for(int i = 0; i < size; i ++){
-            Edge edge = createEdge( createId("source"+i), edgeType, targetId);
-
-            edgeSerialization.writeEdge( scope, edge ).execute();
-
-            edgeMetadataSerialization.writeEdge( scope, edge ).execute();
-
-            writtenEdges.add( edge );
-        }
-
-
-        UUID cleanupVersion = UUIDGenerator.newTimeUUID();
-
-        int value = edgeAsync.clearTargets( scope, targetId, edgeType, cleanupVersion )
-                             .toBlockingObservable().single();
-
-        assertEquals( "No subtypes removed, edges exist", size, value );
-
-        //now delete the edge
-
-        for(Edge created: writtenEdges){
-            edgeSerialization.deleteEdge( scope, created ).execute();
-        }
-
-
-        value = edgeAsync.clearTargets( scope, targetId, edgeType, cleanupVersion )
-                                     .toBlockingObservable().single();
-
-        assertEquals( "Subtypes removed", 0, value );
-
-        //now verify they're gone
-
-        Iterator<String> edgeTypes = edgeMetadataSerialization
-                .getEdgeTypesToTarget( scope, new SimpleSearchEdgeType( targetId, null ) );
-
-        assertFalse( "No edge types exist", edgeTypes.hasNext() );
-
-
-        Iterator<String> sourceTypes = edgeMetadataSerialization
-                .getIdTypesToTarget( scope, new SimpleSearchIdType( targetId, edgeType, null ) );
-
-        assertFalse( "No edge types exist", sourceTypes.hasNext() );
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/96e6f5fc/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairTest.java
new file mode 100644
index 0000000..7bd1a68
--- /dev/null
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairTest.java
@@ -0,0 +1,451 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.impl.stage;
+
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.UUID;
+
+import org.jukito.JukitoRunner;
+import org.jukito.UseModules;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.collection.cassandra.CassandraRule;
+import org.apache.usergrid.persistence.collection.guice.MigrationManagerRule;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphFig;
+import org.apache.usergrid.persistence.graph.guice.TestGraphModule;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchIdType;
+import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
+import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
+import org.apache.usergrid.persistence.graph.serialization.NodeSerialization;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import com.google.inject.Inject;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createEdge;
+import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createId;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+/**
+ *
+ *
+ */
+@RunWith( JukitoRunner.class )
+@UseModules( { TestGraphModule.class } )
+public class EdgeMetaRepairTest {
+
+
+    @ClassRule
+    public static CassandraRule rule = new CassandraRule();
+
+
+    @Inject
+    @Rule
+    public MigrationManagerRule migrationManagerRule;
+
+
+    @Inject
+    protected EdgeMetaRepair edgeMetaRepair;
+
+    @Inject
+    protected EdgeSerialization edgeSerialization;
+
+    @Inject
+    protected EdgeMetadataSerialization edgeMetadataSerialization;
+
+    @Inject
+    protected GraphFig graphFig;
+
+    protected OrganizationScope scope;
+
+
+    @Before
+    public void setup() {
+        scope = mock( OrganizationScope.class );
+
+        Id orgId = mock( Id.class );
+
+        when( orgId.getType() ).thenReturn( "organization" );
+        when( orgId.getUuid() ).thenReturn( UUIDGenerator.newTimeUUID() );
+
+        when( scope.getOrganization() ).thenReturn( orgId );
+    }
+
+
+    @Test
+    public void cleanTargetNoEdgesNoMeta() {
+        //do no writes, then execute a cleanup with no meta data
+
+        final Id targetId = createId( "target" );
+        final String test = "test";
+        final UUID version = UUIDGenerator.newTimeUUID();
+
+        int value = edgeMetaRepair.repairTargets( scope, targetId, test, version ).toBlockingObservable().single();
+
+        assertEquals( "No subtypes found", 0, value );
+    }
+
+
+    @Test
+    public void cleanTargetSingleEdge() throws ConnectionException {
+        Edge edge = createEdge( "source", "test", "target" );
+
+        edgeSerialization.writeEdge( scope, edge ).execute();
+
+        edgeMetadataSerialization.writeEdge( scope, edge ).execute();
+
+        int value = edgeMetaRepair.repairTargets( scope, edge.getTargetNode(), edge.getType(), edge.getVersion() )
+                             .toBlockingObservable().single();
+
+        assertEquals( "No subtypes removed, edge exists", 1, value );
+
+        //now delete the edge
+
+        edgeSerialization.deleteEdge( scope, edge ).execute();
+
+        value = edgeMetaRepair.repairTargets( scope, edge.getTargetNode(), edge.getType(), edge.getVersion() )
+                         .toBlockingObservable().single();
+
+        assertEquals( "Single subtype should be removed", 0, value );
+
+        //now verify they're gone
+
+        Iterator<String> edgeTypes = edgeMetadataSerialization
+                .getEdgeTypesToTarget( scope, new SimpleSearchEdgeType( edge.getTargetNode(), null ) );
+
+        assertFalse( "No edge types exist", edgeTypes.hasNext() );
+
+
+        Iterator<String> sourceTypes = edgeMetadataSerialization
+                .getIdTypesToTarget( scope, new SimpleSearchIdType( edge.getTargetNode(), edge.getType(), null ) );
+
+        assertFalse( "No edge types exist", sourceTypes.hasNext() );
+    }
+
+
+    @Test
+    public void cleanTargetMultipleEdge() throws ConnectionException {
+
+        Id targetId = createId( "target" );
+
+        Edge edge1 = createEdge( createId("source1"), "test", targetId);
+
+
+
+        edgeSerialization.writeEdge( scope, edge1 ).execute();
+
+        edgeMetadataSerialization.writeEdge( scope, edge1 ).execute();
+
+        Edge edge2 = createEdge( createId("source2"), "test", targetId );
+
+        edgeSerialization.writeEdge( scope, edge2 ).execute();
+
+        edgeMetadataSerialization.writeEdge( scope, edge2 ).execute();
+
+        Edge edge3 = createEdge( createId("source3"), "test", targetId );
+
+        edgeSerialization.writeEdge( scope, edge3 ).execute();
+
+        edgeMetadataSerialization.writeEdge( scope, edge3 ).execute();
+
+
+        UUID cleanupVersion = UUIDGenerator.newTimeUUID();
+
+        int value = edgeMetaRepair.repairTargets( scope, edge1.getTargetNode(), edge1.getType(), cleanupVersion )
+                             .toBlockingObservable().single();
+
+        assertEquals( "No subtypes removed, edges exist", 3, value );
+
+        //now delete the edge
+
+        edgeSerialization.deleteEdge( scope, edge1 ).execute();
+
+        value = edgeMetaRepair.repairTargets( scope, edge1.getTargetNode(), edge1.getType(), cleanupVersion )
+                         .toBlockingObservable().single();
+
+        assertEquals( "No subtypes removed, edges exist", 2, value );
+
+        edgeSerialization.deleteEdge( scope, edge2 ).execute();
+
+        value = edgeMetaRepair.repairTargets( scope, edge1.getTargetNode(), edge1.getType(), cleanupVersion )
+                         .toBlockingObservable().single();
+
+        assertEquals( "No subtypes removed, edges exist", 1, value );
+
+        edgeSerialization.deleteEdge( scope, edge3 ).execute();
+
+        value = edgeMetaRepair.repairTargets( scope, edge1.getTargetNode(), edge1.getType(), cleanupVersion )
+                         .toBlockingObservable().single();
+
+
+        assertEquals( "Single subtype should be removed", 0, value );
+
+        //now verify they're gone
+
+        Iterator<String> edgeTypes = edgeMetadataSerialization
+                .getEdgeTypesToTarget( scope, new SimpleSearchEdgeType( edge1.getTargetNode(), null ) );
+
+        assertFalse( "No edge types exist", edgeTypes.hasNext() );
+
+
+        Iterator<String> sourceTypes = edgeMetadataSerialization
+                .getIdTypesToTarget( scope, new SimpleSearchIdType( edge1.getTargetNode(), edge1.getType(), null ) );
+
+        assertFalse( "No edge types exist", sourceTypes.hasNext() );
+    }
+
+
+    @Test
+    public void cleanTargetMultipleEdgeBuffer() throws ConnectionException {
+
+        final Id targetId = createId( "target" );
+        final String edgeType = "test";
+
+        final int size =  graphFig.getRepairConcurrentSize()*2;
+
+        Set<Edge> writtenEdges = new HashSet<Edge>();
+
+
+        for(int i = 0; i < size; i ++){
+            Edge edge = createEdge( createId("source"+i), edgeType, targetId);
+
+            edgeSerialization.writeEdge( scope, edge ).execute();
+
+            edgeMetadataSerialization.writeEdge( scope, edge ).execute();
+
+            writtenEdges.add( edge );
+        }
+
+
+        UUID cleanupVersion = UUIDGenerator.newTimeUUID();
+
+        int value = edgeMetaRepair.repairTargets( scope, targetId, edgeType, cleanupVersion )
+                             .toBlockingObservable().single();
+
+        assertEquals( "No subtypes removed, edges exist", size, value );
+
+        //now delete the edge
+
+        for(Edge created: writtenEdges){
+            edgeSerialization.deleteEdge( scope, created ).execute();
+        }
+
+
+        value = edgeMetaRepair.repairTargets( scope, targetId, edgeType, cleanupVersion )
+                                     .toBlockingObservable().single();
+
+        assertEquals( "Subtypes removed", 0, value );
+
+        //now verify they're gone
+
+        Iterator<String> edgeTypes = edgeMetadataSerialization
+                .getEdgeTypesToTarget( scope, new SimpleSearchEdgeType( targetId, null ) );
+
+        assertFalse( "No edge types exist", edgeTypes.hasNext() );
+
+
+        Iterator<String> sourceTypes = edgeMetadataSerialization
+                .getIdTypesToTarget( scope, new SimpleSearchIdType( targetId, edgeType, null ) );
+
+        assertFalse( "No edge types exist", sourceTypes.hasNext() );
+    }
+
+
+
+    @Test
+    public void cleanSourceSingleEdge() throws ConnectionException {
+        Edge edge = createEdge( "source", "test", "target" );
+
+        edgeSerialization.writeEdge( scope, edge ).execute();
+
+        edgeMetadataSerialization.writeEdge( scope, edge ).execute();
+
+        int value = edgeMetaRepair.repairSources( scope, edge.getSourceNode(), edge.getType(), edge.getVersion() )
+                             .toBlockingObservable().single();
+
+        assertEquals( "No subtypes removed, edge exists", 1, value );
+
+        //now delete the edge
+
+        edgeSerialization.deleteEdge( scope, edge ).execute();
+
+        value = edgeMetaRepair.repairSources( scope, edge.getSourceNode(), edge.getType(), edge.getVersion() )
+                         .toBlockingObservable().single();
+
+        assertEquals( "Single subtype should be removed", 0, value );
+
+        //now verify they're gone
+
+        Iterator<String> edgeTypes = edgeMetadataSerialization
+                .getEdgeTypesToTarget( scope, new SimpleSearchEdgeType( edge.getSourceNode(), null ) );
+
+        assertFalse( "No edge types exist", edgeTypes.hasNext() );
+
+
+        Iterator<String> sourceTypes = edgeMetadataSerialization
+                .getIdTypesToTarget( scope, new SimpleSearchIdType( edge.getSourceNode(), edge.getType(), null ) );
+
+        assertFalse( "No edge types exist", sourceTypes.hasNext() );
+    }
+
+
+    @Test
+    public void cleanSourceMultipleEdge() throws ConnectionException {
+
+        Id sourceId = createId( "source" );
+
+        Edge edge1 = createEdge( sourceId, "test", createId("target1"));
+
+
+
+        edgeSerialization.writeEdge( scope, edge1 ).execute();
+
+        edgeMetadataSerialization.writeEdge( scope, edge1 ).execute();
+
+        Edge edge2 = createEdge( sourceId, "test", createId("target2") );
+
+        edgeSerialization.writeEdge( scope, edge2 ).execute();
+
+        edgeMetadataSerialization.writeEdge( scope, edge2 ).execute();
+
+        Edge edge3 = createEdge( sourceId, "test", createId("target3") );
+
+        edgeSerialization.writeEdge( scope, edge3 ).execute();
+
+        edgeMetadataSerialization.writeEdge( scope, edge3 ).execute();
+
+
+        UUID cleanupVersion = UUIDGenerator.newTimeUUID();
+
+        int value = edgeMetaRepair.repairSources( scope, edge1.getSourceNode(), edge1.getType(), cleanupVersion )
+                             .toBlockingObservable().single();
+
+        assertEquals( "No subtypes removed, edges exist", 3, value );
+
+        //now delete the edge
+
+        edgeSerialization.deleteEdge( scope, edge1 ).execute();
+
+        value = edgeMetaRepair.repairSources( scope, edge1.getSourceNode(), edge1.getType(), cleanupVersion )
+                         .toBlockingObservable().single();
+
+        assertEquals( "No subtypes removed, edges exist", 2, value );
+
+        edgeSerialization.deleteEdge( scope, edge2 ).execute();
+
+        value = edgeMetaRepair.repairSources( scope, edge1.getSourceNode(), edge1.getType(), cleanupVersion )
+                         .toBlockingObservable().single();
+
+        assertEquals( "No subtypes removed, edges exist", 1, value );
+
+        edgeSerialization.deleteEdge( scope, edge3 ).execute();
+
+        value = edgeMetaRepair.repairSources( scope, edge1.getSourceNode(), edge1.getType(), cleanupVersion )
+                         .toBlockingObservable().single();
+
+
+        assertEquals( "Single subtype should be removed", 0, value );
+
+        //now verify they're gone
+
+        Iterator<String> edgeTypes = edgeMetadataSerialization
+                .getEdgeTypesToTarget( scope, new SimpleSearchEdgeType( edge1.getSourceNode(), null ) );
+
+        assertFalse( "No edge types exist", edgeTypes.hasNext() );
+
+
+        Iterator<String> sourceTypes = edgeMetadataSerialization
+                .getIdTypesToTarget( scope, new SimpleSearchIdType( edge1.getSourceNode(), edge1.getType(), null ) );
+
+        assertFalse( "No edge types exist", sourceTypes.hasNext() );
+    }
+
+
+    @Test
+    public void cleanSourceMultipleEdgeBuffer() throws ConnectionException {
+
+        Id sourceId = createId( "source" );
+
+        final String edgeType = "test";
+
+        final int size =  graphFig.getRepairConcurrentSize()*2;
+
+        Set<Edge> writtenEdges = new HashSet<Edge>();
+
+
+        for(int i = 0; i < size; i ++){
+            Edge edge = createEdge( sourceId, edgeType, createId("target"+i));
+
+            edgeSerialization.writeEdge( scope, edge ).execute();
+
+            edgeMetadataSerialization.writeEdge( scope, edge ).execute();
+
+            writtenEdges.add( edge );
+        }
+
+
+        UUID cleanupVersion = UUIDGenerator.newTimeUUID();
+
+        int value = edgeMetaRepair.repairSources( scope, sourceId, edgeType, cleanupVersion )
+                             .toBlockingObservable().single();
+
+        assertEquals( "No subtypes removed, edges exist", size, value );
+
+        //now delete the edge
+
+        for(Edge created: writtenEdges){
+            edgeSerialization.deleteEdge( scope, created ).execute();
+        }
+
+
+        value = edgeMetaRepair.repairSources( scope, sourceId, edgeType, cleanupVersion )
+                                     .toBlockingObservable().single();
+
+        assertEquals( "Subtypes removed", 0, value );
+
+        //now verify they're gone
+
+        Iterator<String> edgeTypes = edgeMetadataSerialization
+                .getEdgeTypesToTarget( scope, new SimpleSearchEdgeType( sourceId, null ) );
+
+        assertFalse( "No edge types exist", edgeTypes.hasNext() );
+
+
+        Iterator<String> sourceTypes = edgeMetadataSerialization
+                .getIdTypesToTarget( scope, new SimpleSearchIdType( sourceId, edgeType, null ) );
+
+        assertFalse( "No edge types exist", sourceTypes.hasNext() );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/96e6f5fc/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairTest.java
new file mode 100644
index 0000000..352639e
--- /dev/null
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairTest.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.impl.stage;
+
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.jukito.JukitoRunner;
+import org.jukito.UseModules;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.collection.cassandra.CassandraRule;
+import org.apache.usergrid.persistence.collection.guice.MigrationManagerRule;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.guice.TestGraphModule;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
+import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import com.google.inject.Inject;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createEdge;
+import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createId;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+/**
+ *
+ *
+ */
+@RunWith( JukitoRunner.class )
+@UseModules( { TestGraphModule.class } )
+public class EdgeWriteRepairTest {
+
+    @ClassRule
+    public static CassandraRule rule = new CassandraRule();
+
+
+    @Inject
+    @Rule
+    public MigrationManagerRule migrationManagerRule;
+
+
+    @Inject
+    protected EdgeSerialization edgeSerialization;
+
+    @Inject
+    protected EdgeWriteRepair edgeWriteRepair;
+
+
+    protected OrganizationScope scope;
+
+
+    @Before
+    public void setup() {
+        scope = mock( OrganizationScope.class );
+
+        Id orgId = mock( Id.class );
+
+        when( orgId.getType() ).thenReturn( "organization" );
+        when( orgId.getUuid() ).thenReturn( UUIDGenerator.newTimeUUID() );
+
+        when( scope.getOrganization() ).thenReturn( orgId );
+    }
+
+
+    /**
+     * Test repairing with no edges
+     */
+    @Test
+    public void noEdges() {
+        Edge edge = createEdge( "source", "test", "target" );
+
+        Iterator<MarkedEdge> edges = edgeWriteRepair.repair( scope, edge ).toBlockingObservable().getIterator();
+
+        assertFalse( "No edges cleaned", edges.hasNext() );
+    }
+
+
+    /**
+     * Test repairing with no edges
+     */
+    @Test
+    public void versionTest() throws ConnectionException {
+        final int size = 10;
+
+        final List<Edge> versions = new ArrayList<Edge>( size );
+
+        final Id sourceId = createId( "source" );
+        final Id targetId = createId( "target" );
+        final String edgeType = "edge";
+
+        for ( int i = 0; i < size; i++ ) {
+            final Edge edge = createEdge( sourceId, edgeType, targetId );
+
+            versions.add( edge );
+
+            edgeSerialization.writeEdge( scope, edge ).execute();
+
+            System.out.println(String.format("[%d] %s", i, edge));
+        }
+
+
+        int keepIndex = size / 2;
+
+        Edge keep = versions.get( keepIndex );
+
+        Iterable<MarkedEdge> edges = edgeWriteRepair.repair( scope, keep ).toBlockingObservable().toIterable();
+
+
+        int index = 0;
+
+        for ( MarkedEdge edge : edges ) {
+
+            final Edge removed = versions.get( keepIndex - index -1 );
+
+            assertEquals( "Removed matches saved index", removed, edge );
+
+            index++;
+        }
+
+        //now verify we get all the versions we expect back
+        Iterator<MarkedEdge> iterator = edgeSerialization.getEdgeFromSource( scope,
+                new SimpleSearchByEdge( sourceId, edgeType, targetId, UUIDGenerator.newTimeUUID(), null ) );
+
+        index = 0;
+
+        for(MarkedEdge edge: new IterableWrapper<MarkedEdge>( iterator )){
+
+            final Edge saved = versions.get( size - index -1 );
+
+            assertEquals(saved, edge);
+
+            index++;
+        }
+
+        assertEquals("Kept edge version was the minimum", keepIndex, index);
+    }
+
+
+    private class IterableWrapper<T> implements Iterable<T> {
+        private final Iterator<T> sourceIterator;
+
+
+        private IterableWrapper( final Iterator<T> sourceIterator ) {
+            this.sourceIterator = sourceIterator;
+        }
+
+
+        @Override
+        public Iterator<T> iterator() {
+            return this.sourceIterator;
+        }
+    }
+}


[2/2] git commit: Finished serialization I/O impls

Posted by sn...@apache.org.
Finished serialization I/O impls


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/078ca7dc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/078ca7dc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/078ca7dc

Branch: refs/heads/asyncqueue
Commit: 078ca7dca71ce62b7a5ae0469f7485563582145c
Parents: 96e6f5f
Author: Todd Nine <tn...@apigee.com>
Authored: Fri Mar 14 15:14:11 2014 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri Mar 14 15:14:11 2014 -0700

----------------------------------------------------------------------
 .../persistence/graph/guice/GraphModule.java    |   4 +
 .../graph/impl/stage/AbstractEdgeRepair.java    | 152 +++++++++++++++
 .../graph/impl/stage/EdgeDeleteRepairImpl.java  |  76 ++++++++
 .../graph/impl/stage/EdgeWriteRepairImpl.java   |  95 +---------
 .../graph/impl/stage/EdgeDeleteRepairTest.java  | 189 +++++++++++++++++++
 .../graph/impl/stage/EdgeWriteRepairTest.java   |   2 +
 6 files changed, 431 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/078ca7dc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
index c81dc82..a5dda9a 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
@@ -34,6 +34,8 @@ import org.apache.usergrid.persistence.graph.consistency.LocalTimeoutQueue;
 import org.apache.usergrid.persistence.graph.consistency.TimeoutQueue;
 import org.apache.usergrid.persistence.graph.impl.CollectionIndexObserver;
 import org.apache.usergrid.persistence.graph.impl.EdgeManagerImpl;
+import org.apache.usergrid.persistence.graph.impl.stage.EdgeDeleteRepair;
+import org.apache.usergrid.persistence.graph.impl.stage.EdgeDeleteRepairImpl;
 import org.apache.usergrid.persistence.graph.impl.stage.EdgeMetaRepair;
 import org.apache.usergrid.persistence.graph.impl.stage.EdgeMetaRepairImpl;
 import org.apache.usergrid.persistence.graph.impl.stage.EdgeWriteRepair;
@@ -107,6 +109,8 @@ public class GraphModule extends AbstractModule {
         bind( EdgeMetaRepair.class).to( EdgeMetaRepairImpl.class );
 
         bind( EdgeWriteRepair.class).to( EdgeWriteRepairImpl.class );
+
+        bind( EdgeDeleteRepair.class).to( EdgeDeleteRepairImpl.class );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/078ca7dc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/AbstractEdgeRepair.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/AbstractEdgeRepair.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/AbstractEdgeRepair.java
new file mode 100644
index 0000000..838de69
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/AbstractEdgeRepair.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.impl.stage;
+
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphFig;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
+import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
+import org.apache.usergrid.persistence.graph.serialization.impl.parse.ObservableIterator;
+
+import com.fasterxml.uuid.UUIDComparator;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import rx.Observable;
+import rx.Scheduler;
+import rx.functions.Func1;
+
+
+/**
+ * SimpleRepair operation
+ *
+ */
+@Singleton
+public abstract class AbstractEdgeRepair  {
+
+    protected final EdgeSerialization edgeSerialization;
+    protected final GraphFig graphFig;
+    protected final Keyspace keyspace;
+    protected final Scheduler scheduler;
+
+
+    @Inject
+    public AbstractEdgeRepair( final EdgeSerialization edgeSerialization, final GraphFig graphFig,
+                               final Keyspace keyspace, final Scheduler scheduler ) {
+        this.edgeSerialization = edgeSerialization;
+        this.graphFig = graphFig;
+        this.keyspace = keyspace;
+        this.scheduler = scheduler;
+    }
+
+
+
+    public Observable<MarkedEdge> repair( final OrganizationScope scope, final Edge edge ) {
+
+        final UUID maxVersion = edge.getVersion();
+
+        //get source edges
+        Observable<MarkedEdge> sourceEdges = getEdgeVersionsFromSource( scope, edge );
+
+        //get target edges
+        Observable<MarkedEdge> targetEdges = getEdgeVersionsToTarget( scope, edge );
+
+
+        //merge source and target then deal with the distinct values
+        return Observable.merge( sourceEdges, targetEdges ).filter(getFilter(maxVersion)).distinctUntilChanged().buffer( graphFig.getScanPageSize() )
+                         .flatMap( new Func1<List<MarkedEdge>, Observable<MarkedEdge>>() {
+                             @Override
+                             public Observable<MarkedEdge> call( final List<MarkedEdge> markedEdges ) {
+                                 final MutationBatch batch = keyspace.prepareMutationBatch();
+
+                                 for ( MarkedEdge edge : markedEdges ) {
+                                     final MutationBatch delete = edgeSerialization.deleteEdge( scope, edge );
+
+                                     batch.mergeShallow( delete );
+                                 }
+
+                                 try {
+                                     batch.execute();
+                                 }
+                                 catch ( ConnectionException e ) {
+                                     throw new RuntimeException( "Unable to issue write to cassandra", e );
+                                 }
+
+                                 return Observable.from( markedEdges ).subscribeOn( scheduler );
+                             }
+             } );
+    }
+
+
+    /**
+     * A filter used to filter out edges appropriately for the max version
+     *
+     * @param maxVersion the max version to use
+     * @return The filter to use in the max version
+     */
+    protected abstract Func1<MarkedEdge, Boolean> getFilter(final UUID maxVersion);
+
+    /**
+     * Get all edge versions <= the specified max from the source
+     */
+    private Observable<MarkedEdge> getEdgeVersionsFromSource( final OrganizationScope scope, final Edge edge ) {
+
+        return Observable.create( new ObservableIterator<MarkedEdge>() {
+            @Override
+            protected Iterator<MarkedEdge> getIterator() {
+
+                final SimpleSearchByEdge search =
+                        new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(),
+                                edge.getVersion(), null );
+
+                return edgeSerialization.getEdgeFromSource( scope, search );
+            }
+        } ).subscribeOn( scheduler );
+    }
+
+
+    /**
+     * Get all edge versions <= the specified max from the source
+     */
+    private Observable<MarkedEdge> getEdgeVersionsToTarget( final OrganizationScope scope, final Edge edge ) {
+
+        return Observable.create( new ObservableIterator<MarkedEdge>() {
+            @Override
+            protected Iterator<MarkedEdge> getIterator() {
+
+                final SimpleSearchByEdge search =
+                        new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(),
+                                edge.getVersion(), null );
+
+                return edgeSerialization.getEdgeToTarget( scope, search );
+            }
+        } ).subscribeOn( scheduler );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/078ca7dc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
new file mode 100644
index 0000000..bf5d3d2
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.impl.stage;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphFig;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
+
+import com.fasterxml.uuid.UUIDComparator;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.netflix.astyanax.Keyspace;
+
+import rx.Observable;
+import rx.Scheduler;
+import rx.functions.Func1;
+
+
+/**
+ * SimpleRepair operation
+ */
+@Singleton
+public class EdgeDeleteRepairImpl extends AbstractEdgeRepair implements EdgeDeleteRepair {
+
+
+    @Inject
+    public EdgeDeleteRepairImpl( final EdgeSerialization edgeSerialization, final GraphFig graphFig,
+                                 final Keyspace keyspace, final Scheduler scheduler ) {
+        super( edgeSerialization, graphFig, keyspace, scheduler );
+    }
+
+
+    @Override
+    public Observable<MarkedEdge> repair( final OrganizationScope scope, final Edge edge ) {
+
+        return super.repair( scope, edge );
+    }
+
+
+    @Override
+    protected Func1<MarkedEdge, Boolean> getFilter( final UUID maxVersion ) {
+        return new Func1<MarkedEdge, Boolean>() {
+            /**
+             * We only want to return edges < this version so we remove them
+             * @param markedEdge
+             * @return
+             */
+            @Override
+            public Boolean call( final MarkedEdge markedEdge ) {
+                return UUIDComparator.staticCompare( markedEdge.getVersion(), maxVersion ) < 1;
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/078ca7dc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairImpl.java
index 548c0e9..e8420a8 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairImpl.java
@@ -20,24 +20,18 @@
 package org.apache.usergrid.persistence.graph.impl.stage;
 
 
-import java.util.Iterator;
-import java.util.List;
 import java.util.UUID;
 
 import org.apache.usergrid.persistence.collection.OrganizationScope;
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.GraphFig;
 import org.apache.usergrid.persistence.graph.MarkedEdge;
-import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
 import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
-import org.apache.usergrid.persistence.graph.serialization.impl.parse.ObservableIterator;
 
 import com.fasterxml.uuid.UUIDComparator;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.netflix.astyanax.Keyspace;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
 import rx.Observable;
 import rx.Scheduler;
@@ -46,41 +40,28 @@ import rx.functions.Func1;
 
 /**
  * SimpleRepair operation
- *
  */
 @Singleton
-public class EdgeWriteRepairImpl implements EdgeWriteRepair {
-
-    protected final EdgeSerialization edgeSerialization;
-    protected final GraphFig graphFig;
-    protected final Keyspace keyspace;
-    protected final Scheduler scheduler;
+public class EdgeWriteRepairImpl extends AbstractEdgeRepair implements EdgeWriteRepair {
 
 
     @Inject
     public EdgeWriteRepairImpl( final EdgeSerialization edgeSerialization, final GraphFig graphFig,
                                 final Keyspace keyspace, final Scheduler scheduler ) {
-        this.edgeSerialization = edgeSerialization;
-        this.graphFig = graphFig;
-        this.keyspace = keyspace;
-        this.scheduler = scheduler;
+        super( edgeSerialization, graphFig, keyspace, scheduler );
     }
 
 
     @Override
     public Observable<MarkedEdge> repair( final OrganizationScope scope, final Edge edge ) {
 
-        final UUID maxVersion = edge.getVersion();
-
-        //get source edges
-        Observable<MarkedEdge> sourceEdges = getEdgeVersionsFromSource( scope, edge );
-
-        //get target edges
-        Observable<MarkedEdge> targetEdges = getEdgeVersionsToTarget( scope, edge );
+        return super.repair( scope, edge );
+    }
 
 
-        //merge source and target then deal with the distinct values
-        return Observable.merge( sourceEdges, targetEdges ).filter( new Func1<MarkedEdge, Boolean>() {
+    @Override
+    protected Func1<MarkedEdge, Boolean> getFilter( final UUID maxVersion ) {
+        return new Func1<MarkedEdge, Boolean>() {
             /**
              * We only want to return edges < this version so we remove them
              * @param markedEdge
@@ -90,66 +71,6 @@ public class EdgeWriteRepairImpl implements EdgeWriteRepair {
             public Boolean call( final MarkedEdge markedEdge ) {
                 return UUIDComparator.staticCompare( markedEdge.getVersion(), maxVersion ) < 0;
             }
-            //buffer the deletes and issue them in a single mutation
-        } ).distinctUntilChanged().buffer( graphFig.getScanPageSize() )
-                         .flatMap( new Func1<List<MarkedEdge>, Observable<MarkedEdge>>() {
-                             @Override
-                             public Observable<MarkedEdge> call( final List<MarkedEdge> markedEdges ) {
-                                 final MutationBatch batch = keyspace.prepareMutationBatch();
-
-                                 for ( MarkedEdge edge : markedEdges ) {
-                                     final MutationBatch delete = edgeSerialization.deleteEdge( scope, edge );
-
-                                     batch.mergeShallow( delete );
-                                 }
-
-                                 try {
-                                     batch.execute();
-                                 }
-                                 catch ( ConnectionException e ) {
-                                     throw new RuntimeException( "Unable to issue write to cassandra", e );
-                                 }
-
-                                 return Observable.from( markedEdges ).subscribeOn( scheduler );
-                             }
-             } );
-    }
-
-
-    /**
-     * Get all edge versions <= the specified max from the source
-     */
-    private Observable<MarkedEdge> getEdgeVersionsFromSource( final OrganizationScope scope, final Edge edge ) {
-
-        return Observable.create( new ObservableIterator<MarkedEdge>() {
-            @Override
-            protected Iterator<MarkedEdge> getIterator() {
-
-                final SimpleSearchByEdge search =
-                        new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(),
-                                edge.getVersion(), null );
-
-                return edgeSerialization.getEdgeFromSource( scope, search );
-            }
-        } ).subscribeOn( scheduler );
-    }
-
-
-    /**
-     * Get all edge versions <= the specified max from the source
-     */
-    private Observable<MarkedEdge> getEdgeVersionsToTarget( final OrganizationScope scope, final Edge edge ) {
-
-        return Observable.create( new ObservableIterator<MarkedEdge>() {
-            @Override
-            protected Iterator<MarkedEdge> getIterator() {
-
-                final SimpleSearchByEdge search =
-                        new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(),
-                                edge.getVersion(), null );
-
-                return edgeSerialization.getEdgeToTarget( scope, search );
-            }
-        } ).subscribeOn( scheduler );
+        };
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/078ca7dc/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairTest.java
new file mode 100644
index 0000000..e8eedc9
--- /dev/null
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.impl.stage;
+
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.jukito.JukitoRunner;
+import org.jukito.UseModules;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.collection.cassandra.CassandraRule;
+import org.apache.usergrid.persistence.collection.guice.MigrationManagerRule;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.guice.TestGraphModule;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
+import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import com.google.inject.Inject;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createEdge;
+import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createId;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+/**
+ *
+ *
+ */
+@RunWith( JukitoRunner.class )
+@UseModules( { TestGraphModule.class } )
+public class EdgeDeleteRepairTest {
+
+    @ClassRule
+    public static CassandraRule rule = new CassandraRule();
+
+
+    @Inject
+    @Rule
+    public MigrationManagerRule migrationManagerRule;
+
+
+    @Inject
+    protected EdgeSerialization edgeSerialization;
+
+    @Inject
+    protected EdgeDeleteRepair edgeDeleteRepair;
+
+
+    protected OrganizationScope scope;
+
+
+    @Before
+    public void setup() {
+        scope = mock( OrganizationScope.class );
+
+        Id orgId = mock( Id.class );
+
+        when( orgId.getType() ).thenReturn( "organization" );
+        when( orgId.getUuid() ).thenReturn( UUIDGenerator.newTimeUUID() );
+
+        when( scope.getOrganization() ).thenReturn( orgId );
+    }
+
+
+    /**
+     * Test repairing with no edges
+     */
+    @Test
+    public void noEdges() {
+        Edge edge = createEdge( "source", "test", "target" );
+
+        Iterator<MarkedEdge> edges = edgeDeleteRepair.repair( scope, edge ).toBlockingObservable().getIterator();
+
+        assertFalse( "No edges cleaned", edges.hasNext() );
+    }
+
+
+    /**
+     * Test repairing with no edges
+     * TODO: TN.  There appears to be a race condition here with ordering.  Not sure if this is intentional as part of the impl
+     * or if it's an issue
+     */
+    @Test
+    public void versionTest() throws ConnectionException {
+        final int size = 10;
+
+        final List<Edge> versions = new ArrayList<Edge>( size );
+
+        final Id sourceId = createId( "source" );
+        final Id targetId = createId( "target" );
+        final String edgeType = "edge";
+
+        for ( int i = 0; i < size; i++ ) {
+            final Edge edge = createEdge( sourceId, edgeType, targetId );
+
+            versions.add( edge );
+
+            edgeSerialization.writeEdge( scope, edge ).execute();
+
+            System.out.println(String.format("[%d] %s", i, edge));
+        }
+
+
+        int deleteIndex = size / 2;
+
+        Edge keep = versions.get( deleteIndex );
+
+        Iterable<MarkedEdge> edges = edgeDeleteRepair.repair( scope, keep ).toBlockingObservable().toIterable();
+
+
+        int index = 0;
+
+        for ( MarkedEdge edge : edges ) {
+
+            final Edge removed = versions.get( deleteIndex - index );
+
+            assertEquals( "Removed matches saved index", removed, edge );
+
+            index++;
+        }
+
+        //now verify we get all the versions we expect back
+        Iterator<MarkedEdge> iterator = edgeSerialization.getEdgeFromSource( scope,
+                new SimpleSearchByEdge( sourceId, edgeType, targetId, UUIDGenerator.newTimeUUID(), null ) );
+
+        index = 0;
+
+        for(MarkedEdge edge: new IterableWrapper<MarkedEdge>( iterator )){
+
+            final Edge saved = versions.get( size - index - 1);
+
+            assertEquals(saved, edge);
+
+            index++;
+        }
+
+        final int keptCount = size-deleteIndex;
+
+        assertEquals("Kept edge version was the minimum", keptCount, index+1);
+    }
+
+
+    private class IterableWrapper<T> implements Iterable<T> {
+        private final Iterator<T> sourceIterator;
+
+
+        private IterableWrapper( final Iterator<T> sourceIterator ) {
+            this.sourceIterator = sourceIterator;
+        }
+
+
+        @Override
+        public Iterator<T> iterator() {
+            return this.sourceIterator;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/078ca7dc/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairTest.java
index 352639e..f796c85 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairTest.java
@@ -109,6 +109,8 @@ public class EdgeWriteRepairTest {
 
     /**
      * Test repairing with no edges
+     * TODO: TN.  There appears to be a race condition here with ordering.  Not sure if this is intentional as part of the impl
+     * or if it's an issue
      */
     @Test
     public void versionTest() throws ConnectionException {