You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/03/27 18:21:07 UTC

[20/27] git commit: Updated Cassandra to Resolve issue

Updated Cassandra to Resolve issue


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/16721c87
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/16721c87
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/16721c87

Branch: refs/heads/entity-manager
Commit: 16721c87921100bbfafa3697801a6579c6a1cb6e
Parents: 029c065
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Mar 26 13:02:30 2014 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Mar 26 13:02:30 2014 -0700

----------------------------------------------------------------------
 stack/corepersistence/collection/pom.xml        |   7 +
 .../usergrid/persistence/graph/GraphFig.java    |  31 ++-
 .../persistence/graph/guice/EdgeWrite.java      |  37 ----
 .../persistence/graph/guice/GraphModule.java    |   6 +-
 .../graph/hystrix/HystrixGraphObservable.java   |  53 +++--
 .../graph/impl/EdgeWriteListener.java           | 125 -----------
 .../graph/impl/GraphManagerImpl.java            |  30 +--
 .../graph/impl/NodeDeleteListener.java          | 193 ++++++++++-------
 .../graph/impl/stage/AbstractEdgeRepair.java    |   3 +-
 .../graph/impl/stage/EdgeMetaRepairImpl.java    |  27 +--
 .../graph/impl/stage/EdgeWriteRepair.java       |  44 ----
 .../graph/impl/stage/EdgeWriteRepairImpl.java   |  76 -------
 .../impl/EdgeMetadataSerializationImpl.java     |   2 +-
 .../impl/EdgeSerializationImpl.java             |  22 +-
 .../impl/parse/ColumnNameIterator.java          |  19 +-
 .../impl/parse/ObservableIterator.java          |  18 +-
 .../graph/impl/NodeDeleteListenerTest.java      |  26 ++-
 .../graph/impl/stage/EdgeWriteRepairTest.java   | 208 -------------------
 .../graph/serialization/TestCount.java          |  36 ++--
 .../graph/src/test/resources/log4j.properties   |  36 ++++
 stack/corepersistence/pom.xml                   |   1 +
 21 files changed, 316 insertions(+), 684 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/16721c87/stack/corepersistence/collection/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/pom.xml b/stack/corepersistence/collection/pom.xml
index 808347e..038fb67 100644
--- a/stack/corepersistence/collection/pom.xml
+++ b/stack/corepersistence/collection/pom.xml
@@ -101,6 +101,12 @@
         </dependency>
 
         <dependency>
+               <groupId>org.apache.cassandra</groupId>
+               <artifactId>cassandra-all</artifactId>
+               <version>${cassandra.version}</version>
+        </dependency>
+
+        <dependency>
             <groupId>org.safehaus.guicyfig</groupId>
             <artifactId>guicyfig</artifactId>
             <version>${guicyfig.version}</version>
@@ -176,6 +182,7 @@
              </dependency>
 
 
+
       <!-- Re-add once this is done
       https://github.com/Netflix/Hystrix/pull/209-->
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/16721c87/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
index de7f72f..80e87f6 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
@@ -19,7 +19,6 @@ public interface GraphFig extends GuicyFig {
 
     public static final String REPAIR_TIMEOUT = "usergrid.graph.repair.timeout";
 
-
     public static final String TIMEOUT_SIZE = "usergrid.graph.timeout.page.size";
 
     public static final String TIMEOUT_TASK_TIME = "usergrid.graph.timeout.task.time";
@@ -32,11 +31,11 @@ public interface GraphFig extends GuicyFig {
 
     public static final String READ_TIMEOUT = "usergrid.graph.read.timeout";
 
-    @Default("1000")
+    @Default( "1000" )
     @Key(SCAN_PAGE_SIZE)
     int getScanPageSize();
 
-    @Default("CL_ONE")
+    @Default("CL_QUORUM")
     @Key(READ_CL)
     String getReadCL();
 
@@ -44,31 +43,31 @@ public interface GraphFig extends GuicyFig {
     @Key(WRITE_CL)
     String getWriteCL();
 
-//    @Default("10000")
-//    @Key(WRITE_TIMEOUT)
-//    int getWriteTimeout();
+    @Default( "10000" )
+    @Key( WRITE_TIMEOUT )
+    int getWriteTimeout();
 
     /**
      * Get the read timeout (in milliseconds) that we should allow when reading from the data source
      */
-    @Default( "10000" )
-    @Key( READ_TIMEOUT )
+    @Default("10000")
+    @Key(READ_TIMEOUT)
     int getReadTimeout();
 
-    @Default( "100" )
-    @Key( TIMEOUT_SIZE )
+    @Default("100")
+    @Key(TIMEOUT_SIZE)
     int getTimeoutReadSize();
 
-    @Default( "500" )
-    @Key( TIMEOUT_TASK_TIME )
+    @Default("500")
+    @Key(TIMEOUT_TASK_TIME)
     long getTaskLoopTime();
 
-    @Default( "10" )
-    @Key( REPAIR_CONCURRENT_SIZE )
+    @Default("5")
+    @Key(REPAIR_CONCURRENT_SIZE)
     int getRepairConcurrentSize();
 
     @Default("10000")
-      @Key(WRITE_TIMEOUT)
-      int getRepairTimeout();
+    @Key(REPAIR_TIMEOUT)
+    int getRepairTimeout();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/16721c87/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/EdgeWrite.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/EdgeWrite.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/EdgeWrite.java
deleted file mode 100644
index b8afb13..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/EdgeWrite.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.graph.guice;
-
-
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
-
-import com.google.inject.BindingAnnotation;
-
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-
-
-@BindingAnnotation
-@Target( { FIELD, PARAMETER, METHOD } )
-@Retention( RUNTIME )
-public @interface EdgeWrite {}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/16721c87/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
index d575471..072cefc 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
@@ -24,9 +24,9 @@ import org.safehaus.guicyfig.GuicyFigModule;
 import org.apache.usergrid.persistence.collection.guice.CollectionModule;
 import org.apache.usergrid.persistence.collection.migration.Migration;
 import org.apache.usergrid.persistence.collection.mvcc.event.PostProcessObserver;
+import org.apache.usergrid.persistence.graph.GraphFig;
 import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
-import org.apache.usergrid.persistence.graph.GraphFig;
 import org.apache.usergrid.persistence.graph.consistency.AsyncProcessor;
 import org.apache.usergrid.persistence.graph.consistency.AsyncProcessorImpl;
 import org.apache.usergrid.persistence.graph.consistency.LocalTimeoutQueue;
@@ -37,8 +37,6 @@ import org.apache.usergrid.persistence.graph.impl.stage.EdgeDeleteRepair;
 import org.apache.usergrid.persistence.graph.impl.stage.EdgeDeleteRepairImpl;
 import org.apache.usergrid.persistence.graph.impl.stage.EdgeMetaRepair;
 import org.apache.usergrid.persistence.graph.impl.stage.EdgeMetaRepairImpl;
-import org.apache.usergrid.persistence.graph.impl.stage.EdgeWriteRepair;
-import org.apache.usergrid.persistence.graph.impl.stage.EdgeWriteRepairImpl;
 import org.apache.usergrid.persistence.graph.serialization.CassandraConfig;
 import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
 import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
@@ -97,13 +95,11 @@ public class GraphModule extends AbstractModule {
         bind(TimeoutQueue.class).to( LocalTimeoutQueue.class );
 
         bind(AsyncProcessor.class).annotatedWith( EdgeDelete.class ).to( AsyncProcessorImpl.class );
-        bind(AsyncProcessor.class).annotatedWith( EdgeWrite.class ).to( AsyncProcessorImpl.class );
         bind(AsyncProcessor.class).annotatedWith( NodeDelete.class ).to( AsyncProcessorImpl.class );
 
         //Repair/cleanup classes
         bind( EdgeMetaRepair.class).to( EdgeMetaRepairImpl.class );
 
-        bind( EdgeWriteRepair.class).to( EdgeWriteRepairImpl.class );
 
         bind( EdgeDeleteRepair.class).to( EdgeDeleteRepairImpl.class );
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/16721c87/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/hystrix/HystrixGraphObservable.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/hystrix/HystrixGraphObservable.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/hystrix/HystrixGraphObservable.java
index 8a44389..209dfbd 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/hystrix/HystrixGraphObservable.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/hystrix/HystrixGraphObservable.java
@@ -20,33 +20,52 @@
 package org.apache.usergrid.persistence.graph.hystrix;
 
 
-import com.netflix.hystrix.HystrixCommand;
 import com.netflix.hystrix.HystrixCommandGroupKey;
 import com.netflix.hystrix.HystrixObservableCommand;
 
 import rx.Observable;
-import rx.schedulers.Schedulers;
 
 
 /**
- *
- *
+ * A utility class that creates graph observables wrapped in Hystrix for timeouts and circuit breakers.
  */
 public class HystrixGraphObservable {
 
     /**
-     * Wrap the observable in the timeout
-     * @param observable
-     * @param <T>
-     * @return
+     * Command group used for realtime user commands
+     */
+    private static final HystrixCommandGroupKey USER_GROUP = HystrixCommandGroupKey.Factory.asKey( "Graph-User" );
+
+    /**
+     * Command group for asynchronous operations
+     */
+    private static final HystrixCommandGroupKey ASYNC_GROUP = HystrixCommandGroupKey.Factory.asKey( "Graph-Async" );
+
+
+    /**
+     * Wrap the observable in the timeout for user facing operation.  This is for user reads and deletes.
      */
-    public static <T> Observable<T> wrap(final Observable<T> observable){
-            return new HystrixObservableCommand<T>( HystrixCommandGroupKey.Factory.asKey( "Graph" ) ){
-
-                @Override
-                protected Observable<T> run() {
-                    return observable;
-                }
-            }.toObservable( Schedulers.io() );
-        }
+    public static <T> Observable<T> user( final Observable<T> observable ) {
+        return new HystrixObservableCommand<T>( USER_GROUP ) {
+
+            @Override
+            protected Observable<T> run() {
+                return observable;
+            }
+        }.observe();
+    }
+
+
+    /**
+      * Wrap the observable in the timeout for asynchronous operations.  This is for compaction and cleanup processing.
+      */
+     public static <T> Observable<T> async( final Observable<T> observable ) {
+         return new HystrixObservableCommand<T>( ASYNC_GROUP ) {
+
+             @Override
+             protected Observable<T> run() {
+                 return observable;
+             }
+         }.observe();
+     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/16721c87/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeWriteListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeWriteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeWriteListener.java
deleted file mode 100644
index 90146dc..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeWriteListener.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.graph.impl;
-
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.UUID;
-
-import org.apache.usergrid.persistence.collection.OrganizationScope;
-import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.graph.GraphFig;
-import org.apache.usergrid.persistence.graph.MarkedEdge;
-import org.apache.usergrid.persistence.graph.consistency.AsyncProcessor;
-import org.apache.usergrid.persistence.graph.consistency.MessageListener;
-import org.apache.usergrid.persistence.graph.guice.EdgeWrite;
-import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
-import org.apache.usergrid.persistence.graph.serialization.impl.parse.ObservableIterator;
-
-import com.fasterxml.uuid.UUIDComparator;
-import com.google.inject.Singleton;
-import com.netflix.astyanax.Keyspace;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-
-import rx.Observable;
-import rx.functions.Func1;
-
-
-/**
- * Construct the asynchronous edge lister for the repair operation.
- */
-@Singleton
-public class EdgeWriteListener implements MessageListener<EdgeEvent<Edge>, EdgeEvent<Edge>> {
-
-    private final EdgeSerialization edgeSerialization;
-    private final GraphFig graphFig;
-    private final Keyspace keyspace;
-
-
-    public EdgeWriteListener( final EdgeSerialization edgeSerialization, final GraphFig graphFig,
-                              final Keyspace keyspace, @EdgeWrite final AsyncProcessor edgeWrite ) {
-        this.edgeSerialization = edgeSerialization;
-        this.graphFig = graphFig;
-        this.keyspace = keyspace;
-        edgeWrite.addListener( this );
-    }
-
-
-    @Override
-    public Observable<EdgeEvent<Edge>> receive( final EdgeEvent<Edge> write ) {
-
-        final Edge edge = write.getData();
-        final OrganizationScope scope = write.getOrganizationScope();
-        final UUID maxVersion = edge.getVersion();
-
-        return Observable.empty();
-
-//      TODO T.N, some async processing for balancing here
-//  return Observable.create( new ObservableIterator<MarkedEdge>() {
-//            @Override
-//            protected Iterator<MarkedEdge> getIterator() {
-//
-//                final SimpleSearchByEdge search =
-//                        new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(), maxVersion,
-//                                null );
-//
-//                return edgeSerialization.getEdgeVersions( scope, search );
-//            }
-//        } ).filter( new Func1<MarkedEdge, Boolean>() {
-//
-//            //TODO, reuse this for delete operation
-//
-//
-//            /**
-//             * We only want to return edges < this version so we remove them
-//             * @param markedEdge
-//             * @return
-//             */
-//            @Override
-//            public Boolean call( final MarkedEdge markedEdge ) {
-//                return UUIDComparator.staticCompare( markedEdge.getVersion(), maxVersion ) < 0;
-//            }
-//            //buffer the deletes and issue them in a single mutation
-//        } ).buffer( graphFig.getScanPageSize() ).map( new Func1<List<MarkedEdge>, EdgeEvent<Edge>>() {
-//            @Override
-//            public EdgeEvent<Edge> call( final List<MarkedEdge> markedEdges ) {
-//
-//                final MutationBatch batch = keyspace.prepareMutationBatch();
-//
-//                for ( MarkedEdge edge : markedEdges ) {
-//                    final MutationBatch delete = edgeSerialization.deleteEdge( scope, edge );
-//
-//                    batch.mergeShallow( delete );
-//                }
-//
-//                try {
-//                    batch.execute();
-//                }
-//                catch ( ConnectionException e ) {
-//                    throw new RuntimeException( "Unable to issue write to cassandra", e );
-//                }
-//
-//                return write;
-//            }
-//        } );
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/16721c87/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
index 94d4dc3..ff63a60 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
@@ -39,7 +39,6 @@ import org.apache.usergrid.persistence.graph.SearchIdType;
 import org.apache.usergrid.persistence.graph.consistency.AsyncProcessor;
 import org.apache.usergrid.persistence.graph.consistency.AsynchronousMessage;
 import org.apache.usergrid.persistence.graph.guice.EdgeDelete;
-import org.apache.usergrid.persistence.graph.guice.EdgeWrite;
 import org.apache.usergrid.persistence.graph.guice.NodeDelete;
 import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
 import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
@@ -56,7 +55,6 @@ import com.netflix.astyanax.MutationBatch;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
 import rx.Observable;
-import rx.Scheduler;
 import rx.functions.Func1;
 import rx.schedulers.Schedulers;
 
@@ -76,7 +74,6 @@ public class GraphManagerImpl implements GraphManager {
 
     private final NodeSerialization nodeSerialization;
 
-    private final AsyncProcessor<Edge> edgeWriteAsyncProcessor;
     private final AsyncProcessor<Edge> edgeDeleteAsyncProcessor;
     private final AsyncProcessor<Id> nodeDeleteAsyncProcessor;
 
@@ -86,7 +83,7 @@ public class GraphManagerImpl implements GraphManager {
     @Inject
     public GraphManagerImpl( final EdgeMetadataSerialization edgeMetadataSerialization,
                             final EdgeSerialization edgeSerialization, final NodeSerialization nodeSerialization,
-                            final GraphFig graphFig, @EdgeWrite final AsyncProcessor edgeWrite,
+                            final GraphFig graphFig,
                             @EdgeDelete final AsyncProcessor edgeDelete, @NodeDelete final AsyncProcessor nodeDelete,
                             @Assisted final OrganizationScope scope ) {
 
@@ -100,9 +97,6 @@ public class GraphManagerImpl implements GraphManager {
         this.graphFig = graphFig;
 
 
-        this.edgeWriteAsyncProcessor = edgeWrite;
-
-
         this.edgeDeleteAsyncProcessor = edgeDelete;
 
 
@@ -123,8 +117,6 @@ public class GraphManagerImpl implements GraphManager {
 
                 mutation.mergeShallow( edgeMutation );
 
-                final AsynchronousMessage<Edge> event = edgeWriteAsyncProcessor.setVerification( edge, getTimeout() );
-
                 try {
                     mutation.execute();
                 }
@@ -132,8 +124,6 @@ public class GraphManagerImpl implements GraphManager {
                     throw new RuntimeException( "Unable to connect to cassandra", e );
                 }
 
-                edgeWriteAsyncProcessor.start( event );
-
                 return edge;
             }
         } );
@@ -199,7 +189,7 @@ public class GraphManagerImpl implements GraphManager {
 
     @Override
     public Observable<Edge> loadEdgeVersions( final SearchByEdge searchByEdge ) {
-        return Observable.create( new ObservableIterator<MarkedEdge>() {
+        return Observable.create( new ObservableIterator<MarkedEdge>( "getEdgeVersions" ) {
             @Override
             protected Iterator<MarkedEdge> getIterator() {
                 return edgeSerialization.getEdgeVersions( scope, searchByEdge );
@@ -211,7 +201,7 @@ public class GraphManagerImpl implements GraphManager {
 
     @Override
     public Observable<Edge> loadEdgesFromSource( final SearchByEdgeType search ) {
-        return Observable.create( new ObservableIterator<MarkedEdge>() {
+        return Observable.create( new ObservableIterator<MarkedEdge>( "getEdgesFromSource" ) {
             @Override
             protected Iterator<MarkedEdge> getIterator() {
                 return edgeSerialization.getEdgesFromSource( scope, search );
@@ -223,7 +213,7 @@ public class GraphManagerImpl implements GraphManager {
 
     @Override
     public Observable<Edge> loadEdgesToTarget( final SearchByEdgeType search ) {
-        return Observable.create( new ObservableIterator<MarkedEdge>() {
+        return Observable.create( new ObservableIterator<MarkedEdge>( "getEdgesToTarget" ) {
             @Override
             protected Iterator<MarkedEdge> getIterator() {
                 return edgeSerialization.getEdgesToTarget( scope, search );
@@ -235,7 +225,7 @@ public class GraphManagerImpl implements GraphManager {
 
     @Override
     public Observable<Edge> loadEdgesFromSourceByType( final SearchByIdType search ) {
-        return Observable.create( new ObservableIterator<MarkedEdge>() {
+        return Observable.create( new ObservableIterator<MarkedEdge>( "getEdgesFromSourceByTargetType" ) {
             @Override
             protected Iterator<MarkedEdge> getIterator() {
                 return edgeSerialization.getEdgesFromSourceByTargetType( scope, search );
@@ -248,7 +238,7 @@ public class GraphManagerImpl implements GraphManager {
 
     @Override
     public Observable<Edge> loadEdgesToTargetByType( final SearchByIdType search ) {
-        return Observable.create( new ObservableIterator<MarkedEdge>() {
+        return Observable.create( new ObservableIterator<MarkedEdge>( "getEdgesToTargetBySourceType" ) {
             @Override
             protected Iterator<MarkedEdge> getIterator() {
                 return edgeSerialization.getEdgesToTargetBySourceType( scope, search );
@@ -261,7 +251,7 @@ public class GraphManagerImpl implements GraphManager {
     @Override
     public Observable<String> getEdgeTypesFromSource( final SearchEdgeType search ) {
 
-        return Observable.create( new ObservableIterator<String>() {
+        return Observable.create( new ObservableIterator<String>( "getEdgeTypesFromSource" ) {
             @Override
             protected Iterator<String> getIterator() {
                 return edgeMetadataSerialization.getEdgeTypesFromSource( scope, search );
@@ -272,7 +262,7 @@ public class GraphManagerImpl implements GraphManager {
 
     @Override
     public Observable<String> getIdTypesFromSource( final SearchIdType search ) {
-        return Observable.create( new ObservableIterator<String>() {
+        return Observable.create( new ObservableIterator<String>( "getIdTypesFromSource" ) {
             @Override
             protected Iterator<String> getIterator() {
                 return edgeMetadataSerialization.getIdTypesFromSource( scope, search );
@@ -284,7 +274,7 @@ public class GraphManagerImpl implements GraphManager {
     @Override
     public Observable<String> getEdgeTypesToTarget( final SearchEdgeType search ) {
 
-        return Observable.create( new ObservableIterator<String>() {
+        return Observable.create( new ObservableIterator<String>( "getEdgeTypesToTarget" ) {
             @Override
             protected Iterator<String> getIterator() {
                 return edgeMetadataSerialization.getEdgeTypesToTarget( scope, search );
@@ -295,7 +285,7 @@ public class GraphManagerImpl implements GraphManager {
 
     @Override
     public Observable<String> getIdTypesToTarget( final SearchIdType search ) {
-        return Observable.create( new ObservableIterator<String>() {
+        return Observable.create( new ObservableIterator<String>( "getIdTypesToTarget" ) {
             @Override
             protected Iterator<String> getIterator() {
                 return edgeMetadataSerialization.getIdTypesToTarget( scope, search );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/16721c87/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
index d0c0bec..aa4f94e 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
@@ -1,17 +1,16 @@
 package org.apache.usergrid.persistence.graph.impl;
 
 
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Set;
 import java.util.UUID;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.thrift.Mutation;
-
 import org.apache.usergrid.persistence.collection.OrganizationScope;
-import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.GraphFig;
 import org.apache.usergrid.persistence.graph.MarkedEdge;
 import org.apache.usergrid.persistence.graph.SearchByEdgeType;
@@ -19,7 +18,6 @@ import org.apache.usergrid.persistence.graph.SearchEdgeType;
 import org.apache.usergrid.persistence.graph.consistency.AsyncProcessor;
 import org.apache.usergrid.persistence.graph.consistency.MessageListener;
 import org.apache.usergrid.persistence.graph.guice.NodeDelete;
-import org.apache.usergrid.persistence.graph.impl.stage.EdgeDeleteRepair;
 import org.apache.usergrid.persistence.graph.impl.stage.EdgeMetaRepair;
 import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
 import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
@@ -35,7 +33,6 @@ import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
 import rx.Observable;
 import rx.functions.Action0;
-import rx.functions.Action1;
 import rx.functions.Func1;
 import rx.schedulers.Schedulers;
 
@@ -51,7 +48,6 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, Intege
     private final NodeSerialization nodeSerialization;
     private final EdgeSerialization edgeSerialization;
     private final EdgeMetadataSerialization edgeMetadataSerialization;
-    private final EdgeDeleteRepair edgeDeleteRepair;
     private final EdgeMetaRepair edgeMetaRepair;
     private final GraphFig graphFig;
     protected final Keyspace keyspace;
@@ -63,8 +59,7 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, Intege
     @Inject
     public NodeDeleteListener( final NodeSerialization nodeSerialization, final EdgeSerialization edgeSerialization,
 
-                               final EdgeMetadataSerialization edgeMetadataSerialization,
-                               final EdgeDeleteRepair edgeDeleteRepair, final EdgeMetaRepair edgeMetaRepair,
+                               final EdgeMetadataSerialization edgeMetadataSerialization, final EdgeMetaRepair edgeMetaRepair,
                                final GraphFig graphFig, @NodeDelete final AsyncProcessor nodeDelete,
                                final Keyspace keyspace ) {
 
@@ -72,7 +67,6 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, Intege
         this.nodeSerialization = nodeSerialization;
         this.edgeSerialization = edgeSerialization;
         this.edgeMetadataSerialization = edgeMetadataSerialization;
-        this.edgeDeleteRepair = edgeDeleteRepair;
         this.edgeMetaRepair = edgeMetaRepair;
         this.graphFig = graphFig;
         this.keyspace = keyspace;
@@ -121,7 +115,8 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, Intege
 
                         //get all edges pointing to the target node and buffer then into groups for deletion
                         Observable<MarkedEdge> targetEdges =
-                                getEdgesTypesToTarget( scope, new SimpleSearchEdgeType( node, null ) )
+                                getEdgesTypesToTarget( scope, new SimpleSearchEdgeType( node, null ) ).subscribeOn(
+                                        Schedulers.io() )
                                         .flatMap( new Func1<String, Observable<MarkedEdge>>() {
                                             @Override
                                             public Observable<MarkedEdge> call( final String edgeType ) {
@@ -133,7 +128,8 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, Intege
 
                         //get all edges pointing to the source node and buffer them into groups for deletion
                         Observable<MarkedEdge> sourceEdges =
-                                getEdgesTypesFromSource( scope, new SimpleSearchEdgeType( node, null ) )
+                                getEdgesTypesFromSource( scope, new SimpleSearchEdgeType( node, null ) ).subscribeOn(
+                                        Schedulers.io() )
                                         .flatMap( new Func1<String, Observable<MarkedEdge>>() {
                                             @Override
                                             public Observable<MarkedEdge> call( final String edgeType ) {
@@ -142,78 +138,87 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, Intege
                                             }
                                         } );
 
-
-                        //each time an edge is emitted, delete it via batch mutation since we'll already be buffered
+                        //merge both source and target into 1 observable.  We'll need to check them all regardless of order
                         return Observable.merge( targetEdges, sourceEdges );
                     }
                 } )
-                //buffer and delete marked edges in our buffer size
-                .buffer( graphFig.getScanPageSize() ).flatMap(
-                        new Func1<List<MarkedEdge>, Observable<MarkedEdge>>() {
-                            @Override
-                            public Observable<MarkedEdge> call( final List<MarkedEdge> markedEdges ) {
+                //buffer and delete marked edges in our buffer size so we're making less trips to cassandra
+                .buffer( graphFig.getScanPageSize() ).flatMap( new Func1<List<MarkedEdge>, Observable<MarkedEdge>>() {
+                    @Override
+                    public Observable<MarkedEdge> call( final List<MarkedEdge> markedEdges ) {
 
-                                LOG.debug( "Batching {} edges for deletion" , markedEdges.size());
+                        LOG.debug( "Batching {} edges for node {} for deletion", markedEdges.size(), node );
 
-                                final MutationBatch batch = keyspace.prepareMutationBatch();
+                        final MutationBatch batch = keyspace.prepareMutationBatch();
 
-                                for(MarkedEdge edge: markedEdges){
+                        Set<TargetPair> sourceNodes = new HashSet<TargetPair>( markedEdges.size() );
+                        Set<TargetPair> targetNodes = new HashSet<TargetPair>( markedEdges.size() );
 
-                                    //delete the newest edge <= the version on the node delete
-                                    LOG.debug( "Deleting edge {}", edge );
-                                    final MutationBatch delete = edgeSerialization.deleteEdge( scope,  edge );
+                        for ( MarkedEdge edge : markedEdges ) {
 
-                                    batch.mergeShallow( delete );
-                                }
+                            //delete the newest edge <= the version on the node delete
+                            final MutationBatch delete = edgeSerialization.deleteEdge( scope, edge );
 
-                                try {
-                                    batch.execute();
-                                }
-                                catch ( ConnectionException e ) {
-                                    throw new RuntimeException( "Unable to delete edges", e );
-                                }
+                            batch.mergeShallow( delete );
 
-                                return Observable.from(markedEdges);
-                            }
-                        } )
-                        //TODO Fix this
-//        .flatMap( new Func1<MarkedEdge, Observable<MarkedEdge>>() {
-//            @Override
-//            public Observable<MarkedEdge> call( final MarkedEdge edge ) {
-//
-//
-//                return Observable.just( edge );
+                            sourceNodes.add( new TargetPair( edge.getSourceNode(), edge.getType() ) );
+                            targetNodes.add( new TargetPair( edge.getTargetNode(), edge.getType() ) );
+                        }
 
+                        try {
+                            batch.execute();
+                        }
+                        catch ( ConnectionException e ) {
+                            throw new RuntimeException( "Unable to delete edges", e );
+                        }
 
+                        //now  delete meta data
 
 
-//                //delete both the source and target meta data in parallel for the edge we deleted in the previous step
-//                //if nothing else is using them
-//                Observable<Integer> sourceMetaRepaired =
-//                        edgeMetaRepair.repairSources( scope, edge.getSourceNode(), edge.getType(), version );
-//
-//                Observable<Integer> targetMetaRepaired =
-//                        edgeMetaRepair.repairTargets( scope, edge.getTargetNode(), edge.getType(), version );
-//
-//                //sum up the number of subtypes we retain
-//                return Observable.concat( sourceMetaRepaired, targetMetaRepaired ).last()
-//                                 .map( new Func1<Integer, MarkedEdge>() {
-//                                     @Override
-//                                     public MarkedEdge call( final Integer integer ) {
-//
-//                                         LOG.debug( "Retained {} subtypes for edge {}", integer, edge );
+                        //delete both the source and target meta data in parallel for the edge we deleted in the
+                        // previous step
+                        //if nothing else is using them.  We purposefully do not schedule them on a new scheduler
+                        //we want them running on the i/o thread from the Observable emitting all the edges
+
 //
-//                                         return edge;
-//                                     }
-//                                 } );
+                        LOG.debug( "About to audit {} source types", sourceNodes.size() );
+
+                        Observable<Integer> sourceMetaCleanup = Observable.from( sourceNodes ).flatMap(
+                                new Func1<TargetPair, Observable<Integer>>() {
+                                    @Override
+                                    public Observable<Integer> call( final TargetPair targetPair ) {
+                                        return edgeMetaRepair
+                                                .repairSources( scope, targetPair.id, targetPair.edgeType, version );
+                                    }
+                                } ).last();
+
+
+                        LOG.debug( "About to audit {} target types", targetNodes.size() );
+
+                        Observable<Integer> targetMetaCleanup =  Observable.from( targetNodes ).flatMap( new Func1<TargetPair, Observable<Integer>>() {
+                            @Override
+                            public Observable<Integer> call( final TargetPair targetPair ) {
+                                return edgeMetaRepair
+                                        .repairTargets( scope, targetPair.id, targetPair.edgeType, version );
+                            }
+                        } ).last();
 
 
-//            }
-//        })
+                        //run both the source/target edge type cleanup, then proceed
+                        return Observable.merge( sourceMetaCleanup, targetMetaCleanup ).last().flatMap(  new Func1<Integer,
+                                Observable<MarkedEdge>>() {
+                            @Override
+                            public Observable<MarkedEdge> call( final Integer integer ) {
+                                return Observable.from( markedEdges );
+                            }
+                        } );
+                    }
+                } )
 
-    .count()
-                //if nothing is ever emitted, emit 0 so that we know no operations took place. Finally remove the
-                // target node in the mark
+                .count()
+                        //if nothing is ever emitted, emit 0 so that we know no operations took place. Finally remove
+                        // the
+                        // target node in the mark
                 .defaultIfEmpty( 0 ).doOnCompleted( new Action0() {
                     @Override
                     public void call() {
@@ -233,12 +238,12 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, Intege
      */
     private Observable<String> getEdgesTypesToTarget( final OrganizationScope scope, final SearchEdgeType search ) {
 
-        return Observable.create( new ObservableIterator<String>(  ) {
+        return Observable.create( new ObservableIterator<String>( "getEdgeTypesToTarget" ) {
             @Override
             protected Iterator<String> getIterator() {
                 return edgeMetadataSerialization.getEdgeTypesToTarget( scope, search );
             }
-        } ).subscribeOn( Schedulers.io() );
+        } );
     }
 
 
@@ -247,12 +252,12 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, Intege
      */
     private Observable<String> getEdgesTypesFromSource( final OrganizationScope scope, final SearchEdgeType search ) {
 
-        return Observable.create( new ObservableIterator<String>(  ) {
+        return Observable.create( new ObservableIterator<String>( "getEdgeTypesFromSource" ) {
             @Override
             protected Iterator<String> getIterator() {
                 return edgeMetadataSerialization.getEdgeTypesFromSource( scope, search );
             }
-        } ).subscribeOn( Schedulers.io() );
+        } );
     }
 
 
@@ -261,12 +266,12 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, Intege
      */
     private Observable<MarkedEdge> loadEdgesToTarget( final OrganizationScope scope, final SearchByEdgeType search ) {
 
-        return Observable.create( new ObservableIterator<MarkedEdge>(  ) {
+        return Observable.create( new ObservableIterator<MarkedEdge>( "getEdgesToTarget" ) {
             @Override
             protected Iterator<MarkedEdge> getIterator() {
                 return edgeSerialization.getEdgesToTarget( scope, search );
             }
-        } ).subscribeOn( Schedulers.io() );
+        } );
     }
 
 
@@ -275,11 +280,53 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, Intege
      */
     private Observable<MarkedEdge> loadEdgesFromSource( final OrganizationScope scope, final SearchByEdgeType search ) {
 
-        return Observable.create( new ObservableIterator<MarkedEdge>(  ) {
+        return Observable.create( new ObservableIterator<MarkedEdge>( "getEdgesFromSource" ) {
             @Override
             protected Iterator<MarkedEdge> getIterator() {
                 return edgeSerialization.getEdgesFromSource( scope, search );
             }
-        } ).subscribeOn( Schedulers.io() );
+        } );
+    }
+
+
+    private static class TargetPair {
+        protected final Id id;
+        protected final String edgeType;
+
+
+        private TargetPair( final Id id, final String edgeType ) {
+            this.id = id;
+            this.edgeType = edgeType;
+        }
+
+
+        @Override
+        public boolean equals( final Object o ) {
+            if ( this == o ) {
+                return true;
+            }
+            if ( o == null || getClass() != o.getClass() ) {
+                return false;
+            }
+
+            final TargetPair that = ( TargetPair ) o;
+
+            if ( !edgeType.equals( that.edgeType ) ) {
+                return false;
+            }
+            if ( !id.equals( that.id ) ) {
+                return false;
+            }
+
+            return true;
+        }
+
+
+        @Override
+        public int hashCode() {
+            int result = id.hashCode();
+            result = 31 * result + edgeType.hashCode();
+            return result;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/16721c87/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/AbstractEdgeRepair.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/AbstractEdgeRepair.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/AbstractEdgeRepair.java
index 310fa1f..898a04d 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/AbstractEdgeRepair.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/AbstractEdgeRepair.java
@@ -42,7 +42,6 @@ import com.netflix.astyanax.MutationBatch;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
 import rx.Observable;
-import rx.Scheduler;
 import rx.functions.Func1;
 import rx.schedulers.Schedulers;
 
@@ -122,7 +121,7 @@ public abstract class AbstractEdgeRepair  {
      */
     private Observable<MarkedEdge> getEdgeVersions( final OrganizationScope scope, final Edge edge ) {
 
-        return Observable.create( new ObservableIterator<MarkedEdge>(  ) {
+        return Observable.create( new ObservableIterator<MarkedEdge>( "edgeVersions" ) {
             @Override
             protected Iterator<MarkedEdge> getIterator() {
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/16721c87/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
index 8b81d0a..022548c 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
@@ -47,7 +47,6 @@ import com.netflix.astyanax.MutationBatch;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
 import rx.Observable;
-import rx.Scheduler;
 import rx.functions.Action1;
 import rx.functions.Func1;
 import rx.observables.MathObservable;
@@ -123,7 +122,8 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair {
                             LOG.debug( "Checking for edges with nodeId {}, type {}, and subtype {}", node, edgeType, subType );
 
                             Observable<Integer> search =
-                                    serialization.loadEdges( scope, node, edgeType, subType, version ).take( 1 ).count()
+                                    //load each edge in it's own thread
+                                    serialization.loadEdges( scope, node, edgeType, subType, version ).subscribeOn( Schedulers.io() ).take( 1 ).count()
                                                  .doOnNext( new Action1<Integer>() {
 
                                                      @Override
@@ -145,7 +145,7 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair {
 
 
 
-                                                         LOG.debug( "No edges with nodeId {}, type {}, and subtype {}. Removing", node, edgeType, subType );
+                                                         LOG.debug( "No edges with nodeId {}, type {}, and subtype {}. Removing subtype.", node, edgeType, subType );
                                                          batch.mergeShallow( serialization
                                                                  .removeEdgeSubType( scope, node, edgeType, subType,
                                                                          version ) );
@@ -166,7 +166,10 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair {
                                                  public void call( final Integer count ) {
 
 
-                                                     LOG.debug( "Executing batch for subtype deletion with type {}.  Mutation has {} rows to mutate ", edgeType, batch.getRowCount() );
+                                                     LOG.debug(
+                                                             "Executing batch for subtype deletion with type {}.  " +
+                                                                     "Mutation has {} rows to mutate ",
+                                                             edgeType, batch.getRowCount() );
 
                                                      try {
                                                          batch.execute();
@@ -250,26 +253,26 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair {
         @Override
         public Observable<String> loadEdgeSubTypes( final OrganizationScope scope, final Id nodeId,
                                                     final String edgeType, final UUID version ) {
-            return Observable.create( new ObservableIterator<String>( ) {
+            return Observable.create( new ObservableIterator<String>( "edgeTargetIdTypes" ) {
                 @Override
                 protected Iterator<String> getIterator() {
                     return edgeMetadataSerialization
                             .getIdTypesToTarget( scope, new SimpleSearchIdType( nodeId, edgeType, null ) );
                 }
-            } ).subscribeOn( Schedulers.io() );
+            } );
         }
 
 
         @Override
         public Observable<MarkedEdge> loadEdges( final OrganizationScope scope, final Id nodeId, final String edgeType,
                                                  final String subType, final UUID version ) {
-            return Observable.create( new ObservableIterator<MarkedEdge>( ) {
+            return Observable.create( new ObservableIterator<MarkedEdge>( "edgeTargetSubTypes" ) {
                 @Override
                 protected Iterator<MarkedEdge> getIterator() {
                     return edgeSerialization.getEdgesToTargetBySourceType( scope,
                             new SimpleSearchByIdType( nodeId, edgeType, version, subType, null ) );
                 }
-            } ).subscribeOn( Schedulers.io() );
+            } );
         }
 
 
@@ -295,26 +298,26 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair {
         @Override
         public Observable<String> loadEdgeSubTypes( final OrganizationScope scope, final Id nodeId,
                                                     final String edgeType, final UUID version ) {
-            return Observable.create( new ObservableIterator<String>( ) {
+            return Observable.create( new ObservableIterator<String>( "edgeSourceIdTypes" ) {
                 @Override
                 protected Iterator<String> getIterator() {
                     return edgeMetadataSerialization
                             .getIdTypesFromSource( scope, new SimpleSearchIdType( nodeId, edgeType, null ) );
                 }
-            } ).subscribeOn( Schedulers.io() );
+            } );
         }
 
 
         @Override
         public Observable<MarkedEdge> loadEdges( final OrganizationScope scope, final Id nodeId, final String edgeType,
                                                  final String subType, final UUID version ) {
-            return Observable.create( new ObservableIterator<MarkedEdge>( ) {
+            return Observable.create( new ObservableIterator<MarkedEdge>( "edgeSourceSubTypes" ) {
                 @Override
                 protected Iterator<MarkedEdge> getIterator() {
                     return edgeSerialization.getEdgesFromSourceByTargetType( scope,
                             new SimpleSearchByIdType( nodeId, edgeType, version, subType, null ) );
                 }
-            } ).subscribeOn( Schedulers.io() );
+            } );
         }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/16721c87/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepair.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepair.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepair.java
deleted file mode 100644
index 92be7a7..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepair.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.graph.impl.stage;
-
-
-import org.apache.usergrid.persistence.collection.OrganizationScope;
-import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.graph.MarkedEdge;
-
-import rx.Observable;
-
-
-/**
- * Interface to perform repair operations on an edge when it is written
- */
-public interface EdgeWriteRepair {
-
-    /**
-     * Repair this edge.  Remove previous entries
-     * @param scope The scope to use
-     * @param edge The last edge to retain.  All versions  < this edge's version  will be deleted
-     *
-     * @return An observable that emits every version of the edge we delete.  Note that it may emit duplicates
-     * since this is a streaming API.
-     */
-    public Observable<MarkedEdge> repair( OrganizationScope scope, Edge edge );
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/16721c87/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairImpl.java
deleted file mode 100644
index f77ef95..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairImpl.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.graph.impl.stage;
-
-
-import java.util.UUID;
-
-import org.apache.usergrid.persistence.collection.OrganizationScope;
-import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.graph.GraphFig;
-import org.apache.usergrid.persistence.graph.MarkedEdge;
-import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
-
-import com.fasterxml.uuid.UUIDComparator;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import com.netflix.astyanax.Keyspace;
-
-import rx.Observable;
-import rx.Scheduler;
-import rx.functions.Func1;
-
-
-/**
- * SimpleRepair operation
- */
-@Singleton
-public class EdgeWriteRepairImpl extends AbstractEdgeRepair implements EdgeWriteRepair {
-
-
-    @Inject
-    public EdgeWriteRepairImpl( final EdgeSerialization edgeSerialization, final GraphFig graphFig,
-                                final Keyspace keyspace) {
-        super( edgeSerialization, graphFig, keyspace );
-    }
-
-
-    @Override
-    public Observable<MarkedEdge> repair( final OrganizationScope scope, final Edge edge ) {
-
-        return super.repair( scope, edge );
-    }
-
-
-    @Override
-    protected Func1<MarkedEdge, Boolean> getFilter( final UUID maxVersion ) {
-        return new Func1<MarkedEdge, Boolean>() {
-            /**
-             * We only want to return edges < this version so we remove them
-             * @param markedEdge
-             * @return
-             */
-            @Override
-            public Boolean call( final MarkedEdge markedEdge ) {
-                return UUIDComparator.staticCompare( markedEdge.getVersion(), maxVersion ) < 0;
-            }
-        };
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/16721c87/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationImpl.java
index b230399..38893e2 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationImpl.java
@@ -132,7 +132,7 @@ public class EdgeMetadataSerializationImpl implements EdgeMetadataSerialization,
         EdgeUtils.validateEdge( edge );
 
 
-        MutationBatch batch = keyspace.prepareMutationBatch();
+        MutationBatch batch = keyspace.prepareMutationBatch().withConsistencyLevel( cassandraConfig.getWriteCL() );
 
         final Id source = edge.getSourceNode();
         final Id target = edge.getTargetNode();

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/16721c87/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
index 0341cac..790c386 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
@@ -299,7 +299,7 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
         final String type = search.getType();
         final UUID maxVersion = search.getMaxVersion();
 
-        return getEdges( GRAPH_SOURCE_NODE_EDGES, new EdgeSearcher<RowKey>( scope, search.last() ) {
+        return getEdges( GRAPH_SOURCE_NODE_EDGES, new EdgeSearcher<RowKey>( scope, maxVersion, search.last() ) {
 
 
             @Override
@@ -345,8 +345,9 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
 
         final Id sourceId = edgeType.getNode();
         final String type = edgeType.getType();
+        final UUID maxVersion = edgeType.getMaxVersion();
 
-        return getEdges( GRAPH_SOURCE_NODE_EDGES, new EdgeSearcher<RowKey>( scope, edgeType.last() ) {
+        return getEdges( GRAPH_SOURCE_NODE_EDGES, new EdgeSearcher<RowKey>( scope, maxVersion, edgeType.last() ) {
             @Override
             protected RowKey generateRowKey() {
                 return new RowKey( sourceId, type );
@@ -377,8 +378,9 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
         final Id targetId = edgeType.getNode();
         final String type = edgeType.getType();
         final String targetType = edgeType.getIdType();
+        final UUID maxVersion = edgeType.getMaxVersion();
 
-        return getEdges( GRAPH_SOURCE_NODE_TARGET_TYPE, new EdgeSearcher<RowKeyType>( scope, edgeType.last() ) {
+        return getEdges( GRAPH_SOURCE_NODE_TARGET_TYPE, new EdgeSearcher<RowKeyType>( scope, maxVersion, edgeType.last() ) {
             @Override
             protected RowKeyType generateRowKey() {
                 return new RowKeyType( targetId, type, targetType );
@@ -407,8 +409,9 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
 
         final Id targetId = edgeType.getNode();
         final String type = edgeType.getType();
+        final UUID maxVersion = edgeType.getMaxVersion();
 
-        return getEdges( GRAPH_TARGET_NODE_EDGES, new EdgeSearcher<RowKey>( scope, edgeType.last() ) {
+        return getEdges( GRAPH_TARGET_NODE_EDGES, new EdgeSearcher<RowKey>( scope, maxVersion, edgeType.last() ) {
 
 
             @Override
@@ -441,8 +444,9 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
         final Id targetId = edgeType.getNode();
         final String sourceType = edgeType.getIdType();
         final String type = edgeType.getType();
+        final UUID maxVersion = edgeType.getMaxVersion();
 
-        return getEdges( GRAPH_TARGET_NODE_SOURCE_TYPE, new EdgeSearcher<RowKeyType>( scope, edgeType.last() ) {
+        return getEdges( GRAPH_TARGET_NODE_SOURCE_TYPE, new EdgeSearcher<RowKeyType>( scope, maxVersion, edgeType.last()) {
             @Override
             protected RowKeyType generateRowKey() {
                 return new RowKeyType( targetId, type, sourceType );
@@ -672,11 +676,13 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
     private static abstract class EdgeSearcher<R> implements ColumnParser<DirectedEdge, MarkedEdge> {
 
         protected final Optional<Edge> last;
+        protected final UUID maxVersion;
         protected final OrganizationScope scope;
 
 
-        protected EdgeSearcher( final OrganizationScope scope, final Optional<Edge> last ) {
+        protected EdgeSearcher( final OrganizationScope scope, final UUID maxVersion , final Optional<Edge> last) {
             this.scope = scope;
+            this.maxVersion = maxVersion;
             this.last = last;
         }
 
@@ -692,7 +698,11 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
 
 
                 builder.setStart( sourceEdge, EDGE_SERIALIZER );
+            }else{
+
+
             }
+
         }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/16721c87/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ColumnNameIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ColumnNameIterator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ColumnNameIterator.java
index b6d17c6..1287dbb 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ColumnNameIterator.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ColumnNameIterator.java
@@ -4,6 +4,7 @@ package org.apache.usergrid.persistence.graph.serialization.impl.parse;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
 
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 import com.netflix.astyanax.model.Column;
 import com.netflix.astyanax.query.RowQuery;
 import com.netflix.hystrix.HystrixCommand;
@@ -87,17 +88,11 @@ public class ColumnNameIterator<C, T> implements Iterable<T>, Iterator<T> {
     private void advanceIterator() {
 
         //run producing the values within a hystrix command.  This way we'll time out if the read takes too long
-        sourceIterator = new HystrixCommand<Iterator<Column<C>>>( HystrixCommand.Setter.withGroupKey( GROUP_KEY )
-                                                                                .andCommandPropertiesDefaults(
-                                                                                        HystrixCommandProperties
-                                                                                                .Setter()
-                                                                                                .withExecutionIsolationThreadTimeoutInMilliseconds(
-                                                                                                        executionTimeout ) ) ) {
-
-            @Override
-            protected Iterator<Column<C>> run() throws Exception {
-                return rowQuery.execute().getResult().iterator();
-            }
-        }.execute();
+        try {
+            sourceIterator = rowQuery.execute().getResult().iterator();
+        }
+        catch ( ConnectionException e ) {
+            throw new RuntimeException( "Unable to get next page", e );
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/16721c87/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ObservableIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ObservableIterator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ObservableIterator.java
index 2d2456b..a222bac 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ObservableIterator.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ObservableIterator.java
@@ -3,15 +3,11 @@ package org.apache.usergrid.persistence.graph.serialization.impl.parse;
 
 import java.util.Iterator;
 
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import rx.Observable;
-import rx.Observer;
 import rx.Subscriber;
-import rx.Subscription;
-import rx.subscriptions.Subscriptions;
 
 
 /**
@@ -23,6 +19,14 @@ public abstract class ObservableIterator<T> implements Observable.OnSubscribe<T>
 
     private static final Logger log = LoggerFactory.getLogger( ObservableIterator.class );
 
+    private final String name;
+
+
+    /**
+     * @param name  The simple name of the iterator, used for debugging
+     */
+    protected ObservableIterator( final String name ) {this.name = name;}
+
 
     @Override
     public void call( final Subscriber<? super T> subscriber ) {
@@ -37,11 +41,15 @@ public abstract class ObservableIterator<T> implements Observable.OnSubscribe<T>
             while ( itr.hasNext() && !subscriber.isUnsubscribed()) {
                 final T next = itr.next();
 
-                log.debug( "Emitting {}", next );
+                log.trace( "Iterator '{}' emitting item '{}'",  name, next );
+
+                assert next != null;
 
                 subscriber.onNext( next );
             }
 
+
+
             subscriber.onCompleted();
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/16721c87/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
index 48890ca..14ab9c4 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
@@ -54,8 +54,8 @@ public class NodeDeleteListenerTest {
 
     private static final Logger log = LoggerFactory.getLogger( NodeDeleteListenerTest.class );
 
-    @ClassRule
-    public static CassandraRule rule = new CassandraRule();
+//    @ClassRule
+//    public static CassandraRule rule = new CassandraRule();
 
 
     @Inject
@@ -317,13 +317,13 @@ public class NodeDeleteListenerTest {
      * since it has no other targets
      */
     @Test
-    public void testMultiDelete() throws ConnectionException {
+    public void testMultiDelete() throws ConnectionException, InterruptedException {
 
         GraphManager em = emf.createEdgeManager( scope );
 
 
         //create loads of edges to easily delete.  We'll keep all the types of "test"
-        final int edgeCount = graphFig.getScanPageSize() * 4;
+        final int edgeCount = graphFig.getScanPageSize() * 2;
         Id toDelete = createId( "toDelete" );
         final String edgeType = "test";
 
@@ -360,9 +360,10 @@ public class NodeDeleteListenerTest {
         log.info( "Saved {} target edges", targetCount );
 
 
-        //mark the node so
-        UUID deleteVersion = UUIDGenerator.newTimeUUID();
+        //mark the node for deletion
+//        UUID deleteVersion = UUIDGenerator.newTimeUUID();
 
+        UUID deleteVersion = UUID.fromString( "ffffffff-ffff-1fff-bfff-ffffffffffff" );
 
         nodeSerialization.mark( scope, toDelete, deleteVersion ).execute();
 
@@ -371,6 +372,7 @@ public class NodeDeleteListenerTest {
 
         int count = deleteListener.receive( deleteEvent ).toBlockingObservable().last();
 
+        //TODO T.N. THIS SHOULD WORK!!!!  It fails intermittently with RX 0.17.1 with too many scheduler threads (which was wrong), try this again after the next release
         assertEquals( edgeCount, count );
 
         //now verify we can't get any of the info back
@@ -378,17 +380,19 @@ public class NodeDeleteListenerTest {
         UUID now = UUIDGenerator.newTimeUUID();
 
 
-        Iterator<MarkedEdge> returned =
+           //validate it's not returned by the
+
+        Iterator<MarkedEdge>  returned = edgeSerialization.getEdgesToTarget( scope, createSearchByEdge( toDelete, edgeType, now, null ) );
+
+        assertFalse( "No target should be returned", returned.hasNext() );
+
+        returned =
                 edgeSerialization.getEdgesFromSource( scope, createSearchByEdge( toDelete, edgeType, now, null ) );
 
         //no edge from source node should be returned
         assertFalse( "No source should be returned", returned.hasNext() );
 
-        //validate it's not returned by the
-
-        returned = edgeSerialization.getEdgesToTarget( scope, createSearchByEdge( toDelete, edgeType, now, null ) );
 
-        assertFalse( "No target should be returned", returned.hasNext() );
 
 
         //no types from source

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/16721c87/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairTest.java
deleted file mode 100644
index c7c95a1..0000000
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairTest.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.graph.impl.stage;
-
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-
-import org.jukito.JukitoRunner;
-import org.jukito.UseModules;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.collection.OrganizationScope;
-import org.apache.usergrid.persistence.collection.cassandra.CassandraRule;
-import org.apache.usergrid.persistence.collection.guice.MigrationManagerRule;
-import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.graph.MarkedEdge;
-import org.apache.usergrid.persistence.graph.guice.TestGraphModule;
-import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
-import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
-import org.apache.usergrid.persistence.model.entity.Id;
-import org.apache.usergrid.persistence.model.util.UUIDGenerator;
-
-import com.google.common.collect.HashMultiset;
-import com.google.common.collect.Multiset;
-import com.google.inject.Inject;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-
-import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createEdge;
-import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createId;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-/**
- *
- *
- */
-@RunWith(JukitoRunner.class)
-@UseModules({ TestGraphModule.class })
-public class EdgeWriteRepairTest {
-
-    private static final Logger LOG = LoggerFactory.getLogger( EdgeWriteRepairTest.class );
-
-    @ClassRule
-    public static CassandraRule rule = new CassandraRule();
-
-
-    @Inject
-    @Rule
-    public MigrationManagerRule migrationManagerRule;
-
-
-    @Inject
-    protected EdgeSerialization edgeSerialization;
-
-    @Inject
-    protected EdgeWriteRepair edgeWriteRepair;
-
-
-    protected OrganizationScope scope;
-
-
-    @Before
-    public void setup() {
-        scope = mock( OrganizationScope.class );
-
-        Id orgId = mock( Id.class );
-
-        when( orgId.getType() ).thenReturn( "organization" );
-        when( orgId.getUuid() ).thenReturn( UUIDGenerator.newTimeUUID() );
-
-        when( scope.getOrganization() ).thenReturn( orgId );
-    }
-
-
-    /**
-     * Test repairing with no edges
-     */
-    @Test
-    public void noEdges() {
-        Edge edge = createEdge( "source", "test", "target" );
-
-        Iterator<MarkedEdge> edges = edgeWriteRepair.repair( scope, edge ).toBlockingObservable().getIterator();
-
-        assertFalse( "No edges cleaned", edges.hasNext() );
-    }
-
-
-    /**
-     * Test repairing with no edges TODO: TN.  There appears to be a race condition here with ordering.  Not sure if
-     * this is intentional as part of the impl or if it's an issue
-     */
-    @Test
-    public void versionTest() throws ConnectionException {
-        final int size = 20;
-
-        final List<Edge> versions = new ArrayList<Edge>( size );
-
-        final Id sourceId = createId( "source" );
-        final Id targetId = createId( "target" );
-        final String edgeType = "edge";
-
-        int deleteIndex = size / 2;
-
-        Set<Edge> deletedEdges = new HashSet<Edge>();
-
-        for ( int i = 0; i < size; i++ ) {
-            final Edge edge = createEdge( sourceId, edgeType, targetId );
-
-            versions.add( edge );
-
-            edgeSerialization.writeEdge( scope, edge ).execute();
-
-            LOG.info( "Writing edge at index [{}] {}", i, edge );
-
-            if ( i < deleteIndex ) {
-                deletedEdges.add( edge );
-            }
-        }
-
-
-        Edge keep = versions.get( deleteIndex );
-
-        Iterable<MarkedEdge> edges = edgeWriteRepair.repair( scope, keep ).toBlockingObservable().toIterable();
-
-        Multiset<Edge> deletedStream = HashMultiset.create();
-
-        for ( MarkedEdge edge : edges ) {
-
-            LOG.info( "Returned edge {} for repair", edge );
-
-            final boolean shouldBeDeleted = deletedEdges.contains( edge );
-
-            assertTrue( "Removed matches saved index", shouldBeDeleted );
-
-            deletedStream.add( edge );
-        }
-
-        deletedEdges.removeAll( deletedStream.elementSet() );
-
-        assertEquals( 0, deletedEdges.size() );
-
-        //now verify we get all the versions we expect back
-        Iterator<MarkedEdge> iterator = edgeSerialization.getEdgeVersions( scope,
-                new SimpleSearchByEdge( sourceId, edgeType, targetId, UUIDGenerator.newTimeUUID(), null ) );
-
-        int count = 0;
-
-        for ( MarkedEdge edge : new IterableWrapper<MarkedEdge>( iterator ) ) {
-
-            final Edge saved = versions.get( size - count - 1 );
-
-            assertEquals( saved, edge );
-
-            count++;
-        }
-
-        final int keptCount = size - deleteIndex;
-
-        assertEquals( "Kept edge version was the minimum", keptCount, count );
-    }
-
-
-    private class IterableWrapper<T> implements Iterable<T> {
-        private final Iterator<T> sourceIterator;
-
-
-        private IterableWrapper( final Iterator<T> sourceIterator ) {
-            this.sourceIterator = sourceIterator;
-        }
-
-
-        @Override
-        public Iterator<T> iterator() {
-            return this.sourceIterator;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/16721c87/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/TestCount.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/TestCount.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/TestCount.java
index 7af55da..2548864 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/TestCount.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/TestCount.java
@@ -6,6 +6,8 @@ import java.util.Iterator;
 import java.util.List;
 
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import rx.Observable;
 import rx.Subscriber;
@@ -21,14 +23,27 @@ import static org.junit.Assert.assertEquals;
  */
 public class TestCount {
 
+    private static final Logger log = LoggerFactory.getLogger(TestCount.class);
 
     @Test
     public void mergeTest(){
 
         final int sizePerObservable = 2000;
 
-        Observable<Integer> input1 = getObservables( sizePerObservable );
-        Observable<Integer> input2 = getObservables( sizePerObservable );
+        Observable<Integer> input1 = getObservables( sizePerObservable ).flatMap( new Func1<Integer, Observable<?
+                extends Integer>>() {
+            @Override
+            public Observable<? extends Integer> call( final Integer integer ) {
+                return getObservables( 100 );
+            }
+        } );
+        Observable<Integer> input2 = getObservables( sizePerObservable ).flatMap( new Func1<Integer, Observable<?
+                extends Integer>>() {
+            @Override
+            public Observable<? extends Integer> call( final Integer integer ) {
+                return getObservables( 100 );
+            }
+        } );
 
        int returned =  Observable.merge(input1, input2).buffer( 1000 ).flatMap(
                new Func1<List<Integer>, Observable<Integer>>() {
@@ -50,7 +65,7 @@ public class TestCount {
                } ).count().defaultIfEmpty( 0 ).toBlockingObservable().last();
 
 
-        assertEquals("Count was correct", sizePerObservable*2, returned);
+        assertEquals("Count was correct", sizePerObservable*2*100, returned);
     }
 
 
@@ -92,19 +107,12 @@ public class TestCount {
                         }
                     }
 
-                    //Sleep for a very long time before emitting the last value
-                    if(i == size -1){
-                        try {
-                            Thread.sleep(5000);
-                        }
-                        catch ( InterruptedException e ) {
-                            subscriber.onError( e );
-                            return;
-                        }
-                    }
+                    final Integer value = values.get( i );
+
+                    log.info( "Emitting {}", value  );
 
 
-                    subscriber.onNext( values.get( i ) );
+                    subscriber.onNext( value );
                 }
 
                 subscriber.onCompleted();

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/16721c87/stack/corepersistence/graph/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/resources/log4j.properties b/stack/corepersistence/graph/src/test/resources/log4j.properties
new file mode 100644
index 0000000..849b280
--- /dev/null
+++ b/stack/corepersistence/graph/src/test/resources/log4j.properties
@@ -0,0 +1,36 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# suppress inspection "UnusedProperty" for whole file
+log4j.rootLogger=INFO,stdout
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %p %c{3}.%M(%L)<%t>- %m%n
+
+log4j.logger.org.safehaus.chop.plugin=DEBUG
+log4j.logger.org.safehaus.guicyfig=ERROR
+log4j.logger.org.safehaus.chop.api.store.amazon=DEBUG
+log4j.logger.org.apache.http=ERROR
+log4j.logger.com.amazonaws.request=ERROR
+log4j.logger.cassandra.db=ERROR
+
+log4j.logger.org.apache.usergrid=DEBUG
+
+log4j.logger.org.apache.usergrid.persistence.graph.serialization.impl.parse=TRACE

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/16721c87/stack/corepersistence/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/pom.xml b/stack/corepersistence/pom.xml
index 9b7b1e1..9a9b7d3 100644
--- a/stack/corepersistence/pom.xml
+++ b/stack/corepersistence/pom.xml
@@ -40,6 +40,7 @@
         <surefire.version>2.16</surefire.version>
         <rx.version>0.17.1</rx.version>
         <hystrix.version>1.4.0-RC1</hystrix.version>
+        <cassandra.version>1.2.15</cassandra.version>
 
     </properties>