You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2014/08/18 21:42:12 UTC

[1/3] git commit: Upgraded All external calls to use Hystrix.

Repository: incubator-usergrid
Updated Branches:
  refs/heads/USERGRID-188 72684dfca -> c69c1974a


Upgraded All external calls to use Hystrix.

Added shard compaction tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/cef3de96
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/cef3de96
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/cef3de96

Branch: refs/heads/USERGRID-188
Commit: cef3de96c0a38b22420935c941bba422a32310b6
Parents: 72684df
Author: Todd Nine <to...@apache.org>
Authored: Thu Aug 14 17:28:40 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Thu Aug 14 17:28:40 2014 -0600

----------------------------------------------------------------------
 .../collection/guice/TestModule.java            |   2 +
 .../core/hystrix/HystrixCassandra.java          |  63 +---
 .../core/cassandra/CassandraRule.java           |  15 +-
 .../persistence/graph/guice/GraphModule.java    |  52 +--
 .../graph/guice/StorageEdgeSerialization.java   |  34 --
 .../graph/impl/GraphManagerImpl.java            |  48 +--
 .../graph/impl/stage/EdgeDeleteRepairImpl.java  |  11 +-
 .../graph/impl/stage/EdgeMetaRepairImpl.java    |  56 ++-
 .../impl/stage/NodeDeleteListenerImpl.java      |  19 +-
 .../impl/NodeSerializationImpl.java             |  70 ++--
 .../NodeShardCounterSerializationImpl.java      |  17 +-
 .../shard/impl/NodeShardAllocationImpl.java     |  71 ++--
 .../graph/GraphManagerShardConsistencyIT.java   | 337 +++++++++++++++++++
 .../graph/StorageGraphManagerIT.java            |   2 +-
 .../graph/impl/EdgeDeleteListenerTest.java      |   4 +-
 .../graph/impl/NodeDeleteListenerTest.java      |   2 -
 .../graph/impl/stage/EdgeDeleteRepairTest.java  |   2 -
 .../graph/impl/stage/EdgeMetaRepairTest.java    |   2 -
 .../EdgeSerializationChopTest.java              |   2 -
 .../PermanentSerializationTest.java             |   2 -
 .../src/test/resources/usergrid-UNIT.properties |   1 +
 stack/corepersistence/pom.xml                   |   2 +-
 22 files changed, 493 insertions(+), 321 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cef3de96/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/TestModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/TestModule.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/TestModule.java
index a049b4d..44a8fe3 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/TestModule.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/TestModule.java
@@ -37,7 +37,9 @@ public abstract class TestModule extends AbstractModule {
 
         try {
             //load up the properties
+            ConfigurationManager.getDeploymentContext().setDeploymentEnvironment( "UNIT" );
             ConfigurationManager.loadCascadedPropertiesFromResources( "usergrid" );
+
         }
         catch ( IOException e ) {
             throw new RuntimeException( "Cannot do much without properly loading our configuration.", e );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cef3de96/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/hystrix/HystrixCassandra.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/hystrix/HystrixCassandra.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/hystrix/HystrixCassandra.java
index d503a6c..7758651 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/hystrix/HystrixCassandra.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/hystrix/HystrixCassandra.java
@@ -20,6 +20,7 @@
 package org.apache.usergrid.persistence.core.hystrix;
 
 
+import com.netflix.astyanax.Execution;
 import com.netflix.astyanax.MutationBatch;
 import com.netflix.astyanax.connectionpool.OperationResult;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
@@ -27,6 +28,7 @@ import com.netflix.astyanax.model.ColumnList;
 import com.netflix.astyanax.query.RowQuery;
 import com.netflix.hystrix.HystrixCommand;
 import com.netflix.hystrix.HystrixCommandGroupKey;
+import com.netflix.hystrix.HystrixThreadPoolProperties;
 
 
 /**
@@ -36,6 +38,7 @@ public class HystrixCassandra {
 
 
 
+
     /**
      * Command group used for realtime user commands
      */
@@ -48,17 +51,17 @@ public class HystrixCassandra {
 
 
     /**
-     * Execute an user mutation
+     * Execute an user operation
      */
-    public static OperationResult<Void> userMutation( final MutationBatch batch ) {
+    public static <R> OperationResult<R> user( final Execution<R> execution) {
 
 
-        return new HystrixCommand<OperationResult<Void>>( USER_GROUP ) {
+        return new HystrixCommand<OperationResult<R>>( USER_GROUP ) {
 
             @Override
-            protected OperationResult<Void> run() {
+            protected OperationResult<R> run() {
                 try {
-                    return batch.execute();
+                    return  execution.execute();
                 }
                 catch ( ConnectionException e ) {
                     throw new RuntimeException( e );
@@ -69,17 +72,17 @@ public class HystrixCassandra {
 
 
     /**
-     * Execute an user mutation
+     * Execute an an async operation
      */
-    public static <K, C> OperationResult<ColumnList<C>> userQuery( final RowQuery<K, C> query ) {
+    public static <R> OperationResult<R> async( final Execution<R> execution) {
 
 
-        return new HystrixCommand<OperationResult<ColumnList<C>>>( USER_GROUP ) {
+        return new HystrixCommand<OperationResult<R>>( ASYNC_GROUP ) {
 
             @Override
-            protected OperationResult<ColumnList<C>> run() {
+            protected OperationResult<R> run() {
                 try {
-                    return query.execute();
+                    return  execution.execute();
                 }
                 catch ( ConnectionException e ) {
                     throw new RuntimeException( e );
@@ -89,44 +92,4 @@ public class HystrixCassandra {
     }
 
 
-    /**
-     * Execute an asynchronous mutation
-     */
-    public static OperationResult<Void> asyncMutation( final MutationBatch batch ) {
-
-
-        return new HystrixCommand<OperationResult<Void>>( ASYNC_GROUP ) {
-
-            @Override
-            protected OperationResult<Void> run() {
-                try {
-                    return batch.execute();
-                }
-                catch ( ConnectionException e ) {
-                    throw new RuntimeException( e );
-                }
-            }
-        }.execute();
-    }
-
-
-    /**
-     * Execute an asynchronous query
-     */
-    public static <K, C> OperationResult<ColumnList<C>> asyncQuery( final RowQuery<K, C> query ) {
-
-
-        return new HystrixCommand<OperationResult<ColumnList<C>>>( ASYNC_GROUP ) {
-
-            @Override
-            protected OperationResult<ColumnList<C>> run() {
-                try {
-                    return query.execute();
-                }
-                catch ( ConnectionException e ) {
-                    throw new RuntimeException( e );
-                }
-            }
-        }.execute();
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cef3de96/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/cassandra/CassandraRule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/cassandra/CassandraRule.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/cassandra/CassandraRule.java
index caf89c5..ee89e0f 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/cassandra/CassandraRule.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/cassandra/CassandraRule.java
@@ -5,6 +5,8 @@ import com.google.common.io.Files;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
 import com.netflix.astyanax.test.EmbeddedCassandra;
+import com.netflix.config.ConfigurationManager;
+
 import java.io.File;
 import java.io.IOException;
 import org.apache.cassandra.io.util.FileUtils;
@@ -32,9 +34,19 @@ public class CassandraRule extends EnvironResource {
 
     private final CassandraFig cassandraFig;
 
+    static{
+        ConfigurationManager.getDeploymentContext().setDeploymentEnvironment( "UNIT" );
+        try {
+            ConfigurationManager.loadCascadedPropertiesFromResources( "usergrid" );
+        }
+        catch ( IOException e ) {
+            throw new RuntimeException(e);
+        }
+    }
 
     public CassandraRule() {
         super( Env.UNIT );
+
         Injector injector = Guice.createInjector( new GuicyFigModule( CassandraFig.class ) );
         cassandraFig = injector.getInstance( CassandraFig.class );
     }
@@ -43,7 +55,8 @@ public class CassandraRule extends EnvironResource {
     protected void before() throws Throwable {
 
         if ( !cassandraFig.isEmbedded()) {
-            LOG.info("Using external Cassandra"); 
+            LOG.info("Using external Cassandra");
+            return;
         }
 
         if ( started ) {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cef3de96/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
index ca7c270..64f0fbb 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
@@ -21,7 +21,6 @@ package org.apache.usergrid.persistence.graph.guice;
 
 import org.safehaus.guicyfig.GuicyFigModule;
 
-import org.apache.usergrid.persistence.core.astyanax.CassandraConfig;
 import org.apache.usergrid.persistence.core.consistency.TimeService;
 import org.apache.usergrid.persistence.core.consistency.TimeServiceImpl;
 import org.apache.usergrid.persistence.core.migration.Migration;
@@ -61,13 +60,9 @@ import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.Sizeb
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.SizebasedEdgeShardStrategy;
 
 import com.google.inject.AbstractModule;
-import com.google.inject.Inject;
 import com.google.inject.Key;
-import com.google.inject.Provides;
-import com.google.inject.Singleton;
 import com.google.inject.assistedinject.FactoryModuleBuilder;
 import com.google.inject.multibindings.Multibinder;
-import com.netflix.astyanax.Keyspace;
 
 
 public class GraphModule extends AbstractModule {
@@ -116,7 +111,13 @@ public class GraphModule extends AbstractModule {
         bind( NodeDeleteListener.class ).to( NodeDeleteListenerImpl.class );
         bind( EdgeDeleteListener.class ).to( EdgeDeleteListenerImpl.class );
 
+        bind( EdgeSerialization.class ).to( EdgeSerializationImpl.class );
 
+        bind( EdgeShardStrategy.class ).to( SizebasedEdgeShardStrategy.class );
+
+        bind( ShardedEdgeSerialization.class ).to( ShardedEdgeSerializationImpl.class );
+
+        bind( EdgeColumnFamilies.class ).to( SizebasedEdgeColumnFamilies.class );
 
 
         /**
@@ -134,50 +135,11 @@ public class GraphModule extends AbstractModule {
         migrationBinding.addBinding().to( Key.get( EdgeMetadataSerialization.class ) );
 
         //bind each singleton to the multi set.  Otherwise we won't migrate properly
-        migrationBinding.addBinding().to( Key.get( EdgeColumnFamilies.class, StorageEdgeSerialization.class ) );
+        migrationBinding.addBinding().to( Key.get( EdgeColumnFamilies.class ) );
 
         migrationBinding.addBinding().to( Key.get( EdgeShardSerialization.class ) );
         migrationBinding.addBinding().to( Key.get( NodeShardCounterSerialization.class ) );
     }
-
-
-    /**
-     * Our permanent serialization strategy
-     */
-    @Provides
-    @Singleton
-    @Inject
-    @StorageEdgeSerialization
-    public EdgeSerialization storageSerialization( final NodeShardCache cache, final Keyspace keyspace,
-                                                   final CassandraConfig cassandraConfig, final GraphFig graphFig,
-                                                   final NodeShardApproximation shardApproximation,
-                                                   final TimeService timeService,
-                                                   @StorageEdgeSerialization
-                                                   final EdgeColumnFamilies edgeColumnFamilies ) {
-
-        final EdgeShardStrategy sizeBasedStrategy = new SizebasedEdgeShardStrategy( cache, shardApproximation );
-
-
-        final ShardedEdgeSerialization serialization = new ShardedEdgeSerializationImpl(keyspace, cassandraConfig, graphFig, sizeBasedStrategy,
-                timeService );
-
-
-        final EdgeSerializationImpl edgeSerialization =
-                new EdgeSerializationImpl( keyspace, cassandraConfig, graphFig, sizeBasedStrategy, edgeColumnFamilies,
-                        serialization );
-
-
-        return edgeSerialization;
-    }
-
-
-    @Provides
-    @Singleton
-    @Inject
-    @StorageEdgeSerialization
-    public EdgeColumnFamilies storageSerializationColumnFamilies() {
-        return new SizebasedEdgeColumnFamilies();
-    }
 }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cef3de96/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/StorageEdgeSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/StorageEdgeSerialization.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/StorageEdgeSerialization.java
deleted file mode 100644
index 267be03..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/StorageEdgeSerialization.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.graph.guice;
-
-
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-import com.google.inject.BindingAnnotation;
-
-
-@Retention(RetentionPolicy.RUNTIME)
-@Target({ ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD })
-@BindingAnnotation
-public @interface StorageEdgeSerialization {}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cef3de96/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
index 473f49e..ae38372 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
@@ -28,6 +28,7 @@ import java.util.UUID;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
 import org.apache.usergrid.persistence.core.rx.ObservableIterator;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.util.ValidationUtils;
@@ -40,7 +41,6 @@ import org.apache.usergrid.persistence.graph.SearchByEdgeType;
 import org.apache.usergrid.persistence.graph.SearchByIdType;
 import org.apache.usergrid.persistence.graph.SearchEdgeType;
 import org.apache.usergrid.persistence.graph.SearchIdType;
-import org.apache.usergrid.persistence.graph.guice.StorageEdgeSerialization;
 import org.apache.usergrid.persistence.graph.impl.stage.EdgeDeleteListener;
 import org.apache.usergrid.persistence.graph.impl.stage.NodeDeleteListener;
 import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
@@ -82,7 +82,6 @@ public class GraphManagerImpl implements GraphManager {
     private final EdgeDeleteListener edgeDeleteListener;
     private final NodeDeleteListener nodeDeleteListener;
 
-    private Observer<Integer> edgeWriteSubcriber;
     private Observer<Integer> edgeDeleteSubcriber;
     private Observer<Integer> nodeDelete;
 
@@ -92,7 +91,7 @@ public class GraphManagerImpl implements GraphManager {
 
     @Inject
     public GraphManagerImpl( final EdgeMetadataSerialization edgeMetadataSerialization,
-                             @StorageEdgeSerialization final EdgeSerialization storageEdgeSerialization,
+                             final EdgeSerialization storageEdgeSerialization,
                              final NodeSerialization nodeSerialization, final GraphFig graphFig,
                              @Assisted final ApplicationScope scope, final EdgeDeleteListener edgeDeleteListener,
                              final NodeDeleteListener nodeDeleteListener ) {
@@ -113,7 +112,6 @@ public class GraphManagerImpl implements GraphManager {
         this.edgeDeleteListener = edgeDeleteListener;
         this.nodeDeleteListener = nodeDeleteListener;
 
-        this.edgeWriteSubcriber = MetricSubscriber.INSTANCE;
         this.edgeDeleteSubcriber = MetricSubscriber.INSTANCE;
         this.nodeDelete = MetricSubscriber.INSTANCE;
     }
@@ -138,14 +136,7 @@ public class GraphManagerImpl implements GraphManager {
 
                 mutation.mergeShallow( edgeMutation );
 
-
-                try {
-                    LOG.debug( "Writing edge {} to metadata and commit log", edge );
-                    mutation.execute();
-                }
-                catch ( ConnectionException e ) {
-                    throw new RuntimeException( "Unable to connect to cassandra", e );
-                }
+                HystrixCassandra.user( mutation );
 
                 return edge;
             }
@@ -170,13 +161,8 @@ public class GraphManagerImpl implements GraphManager {
                 final MutationBatch edgeMutation = storageEdgeSerialization.writeEdge( scope, edge, timestamp );
 
 
-                try {
-                    LOG.debug( "Marking edge {} as deleted to commit log", edge );
-                    edgeMutation.execute();
-                }
-                catch ( ConnectionException e ) {
-                    throw new RuntimeException( "Unable to connect to cassandra", e );
-                }
+                LOG.debug( "Marking edge {} as deleted to commit log", edge );
+                HystrixCassandra.user( edgeMutation );
 
 
                 //HystrixCassandra.async( edgeDeleteListener.receive( scope, markedEdge,
@@ -205,13 +191,9 @@ public class GraphManagerImpl implements GraphManager {
                 final MutationBatch nodeMutation = nodeSerialization.mark( scope, id, timestamp );
 
 
-                try {
-                    LOG.debug( "Marking node {} as deleted to node mark", node );
-                    nodeMutation.execute();
-                }
-                catch ( ConnectionException e ) {
-                    throw new RuntimeException( "Unable to connect to cassandra", e );
-                }
+                LOG.debug( "Marking node {} as deleted to node mark", node );
+                HystrixCassandra.user( nodeMutation );
+
 
                 //HystrixCassandra.async(nodeDeleteListener.receive(scope, id, eventTimestamp  )).subscribeOn(
                 // Schedulers.io() ).subscribe( nodeDelete );
@@ -349,7 +331,7 @@ public class GraphManagerImpl implements GraphManager {
          * used in conjunction with the max version filter to filter any edges that should not be returned
          *
          * @return An observable that emits only edges that can be consumed.  There could be multiple versions of the
-         *         same edge so those need de-duped.
+         * same edge so those need de-duped.
          */
         @Override
         public Observable<MarkedEdge> call( final List<MarkedEdge> markedEdges ) {
@@ -409,18 +391,6 @@ public class GraphManagerImpl implements GraphManager {
         }
     }
 
-    /**
-     * Used for testing and callback hooks.  TODO: Refactor
-     */
-
-    /**
-     * Set the subcription for the edge write
-     */
-    public void setEdgeWriteSubcriber( final Observer<Integer> edgeWriteSubcriber ) {
-        Preconditions.checkNotNull( edgeWriteSubcriber, "Subscriber cannot be null" );
-        this.edgeWriteSubcriber = edgeWriteSubcriber;
-    }
-
 
     /**
      * Set the subscription for the edge delete

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cef3de96/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
index 6eb7a85..5be3541 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
@@ -26,13 +26,13 @@ import java.util.UUID;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
 import org.apache.usergrid.persistence.core.rx.ObservableIterator;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.GraphFig;
 import org.apache.usergrid.persistence.graph.MarkedEdge;
 import org.apache.usergrid.persistence.graph.exception.GraphRuntimeException;
-import org.apache.usergrid.persistence.graph.guice.StorageEdgeSerialization;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
 import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
 
@@ -61,7 +61,7 @@ public class EdgeDeleteRepairImpl implements EdgeDeleteRepair {
 
 
     @Inject
-    public EdgeDeleteRepairImpl( @StorageEdgeSerialization final EdgeSerialization storageSerialization,
+    public EdgeDeleteRepairImpl( final EdgeSerialization storageSerialization,
                                  final GraphFig graphFig, final Keyspace keyspace ) {
 
         Preconditions.checkNotNull( "storageSerialization is required", storageSerialization );
@@ -95,12 +95,7 @@ public class EdgeDeleteRepairImpl implements EdgeDeleteRepair {
 
 
                                     //remove from storage
-                                    try {
-                                        storageSerialization.deleteEdge( scope, edge, timestamp ).execute();
-                                    }
-                                    catch ( ConnectionException e ) {
-                                        throw new GraphRuntimeException( "Unable to remove edge from storage", e );
-                                    }
+                                    HystrixCassandra.async(storageSerialization.deleteEdge( scope, edge, timestamp ));
                                 }
                             }
                         } );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cef3de96/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
index a0769f5..055b867 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
@@ -27,13 +27,13 @@ import java.util.List;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
 import org.apache.usergrid.persistence.core.rx.ObservableIterator;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.util.ValidationUtils;
 import org.apache.usergrid.persistence.graph.GraphFig;
 import org.apache.usergrid.persistence.graph.MarkedEdge;
 import org.apache.usergrid.persistence.graph.exception.GraphRuntimeException;
-import org.apache.usergrid.persistence.graph.guice.StorageEdgeSerialization;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchByIdType;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchIdType;
 import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
@@ -72,8 +72,7 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair {
 
     @Inject
     public EdgeMetaRepairImpl( final EdgeMetadataSerialization edgeMetadataSerialization, final Keyspace keyspace,
-                               final GraphFig graphFig,
-                               @StorageEdgeSerialization final EdgeSerialization storageEdgeSerialization ) {
+                               final GraphFig graphFig, final EdgeSerialization storageEdgeSerialization ) {
 
 
         Preconditions.checkNotNull( "edgeMetadataSerialization is required", edgeMetadataSerialization );
@@ -115,9 +114,9 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair {
         Preconditions.checkNotNull( serialization, "serialization is required" );
 
 
-        Observable<Integer> deleteCounts = serialization.loadEdgeSubTypes( scope, node, edgeType, maxTimestamp )
-                .buffer( graphFig.getRepairConcurrentSize() )
-                        //buffer them into concurrent groups based on the concurrent repair size
+        Observable<Integer> deleteCounts = serialization.loadEdgeSubTypes( scope, node, edgeType, maxTimestamp ).buffer(
+                graphFig.getRepairConcurrentSize() )
+                //buffer them into concurrent groups based on the concurrent repair size
                 .flatMap( new Func1<List<String>, Observable<Integer>>() {
 
                     @Override
@@ -154,15 +153,15 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair {
                                                           **/
                                                          if ( count != 0 ) {
                                                              LOG.debug( "Found edge with nodeId {}, type {}, "
-                                                                     + "and subtype {}. Not removing subtype. ", node,
-                                                                     edgeType, subType );
+                                                                             + "and subtype {}. Not removing subtype. ",
+                                                                     node, edgeType, subType );
                                                              return;
                                                          }
 
 
                                                          LOG.debug( "No edges with nodeId {}, type {}, "
-                                                                 + "and subtype {}. Removing subtype.", node, edgeType,
-                                                                 subType );
+                                                                         + "and subtype {}. Removing subtype.", node,
+                                                                 edgeType, subType );
                                                          batch.mergeShallow( serialization
                                                                  .removeEdgeSubType( scope, node, edgeType, subType,
                                                                          maxTimestamp ) );
@@ -179,22 +178,22 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair {
                          */
                         return MathObservable.sumInteger( Observable.merge( checks ) )
                                              .doOnNext( new Action1<Integer>() {
-                                                 @Override
-                                                 public void call( final Integer count ) {
+                                                            @Override
+                                                            public void call( final Integer count ) {
 
 
-                                                     LOG.debug( "Executing batch for subtype deletion with type {}.  "
-                                                             + "Mutation has {} rows to mutate ", edgeType,
-                                                             batch.getRowCount() );
+                                                                LOG.debug(
+                                                                        "Executing batch for subtype deletion with " +
+                                                                                "type {}.  "
+                                                                                + "Mutation has {} rows to mutate ",
+                                                                        edgeType, batch.getRowCount() );
 
-                                                     try {
-                                                         batch.execute();
-                                                     }
-                                                     catch ( ConnectionException e ) {
-                                                         throw new GraphRuntimeException( "Unable to execute mutation", e );
-                                                     }
-                                                 }
-                                             } );
+                                                                HystrixCassandra.async( batch );
+                                                            }
+                                                        }
+
+
+                                                      );
                     }
                 } )
                         //if we get no edges, emit a 0 so the caller knows we can delete the type
@@ -217,15 +216,10 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair {
                     return;
                 }
 
-                try {
 
-                    LOG.debug( "Type {} has no subtypes in use as of maxTimestamp {}.  Deleting type.", edgeType,
-                            maxTimestamp );
-                    serialization.removeEdgeType( scope, node, edgeType, maxTimestamp ).execute();
-                }
-                catch ( ConnectionException e ) {
-                    throw new GraphRuntimeException( "Unable to execute mutation" );
-                }
+                LOG.debug( "Type {} has no subtypes in use as of maxTimestamp {}.  Deleting type.", edgeType,
+                        maxTimestamp );
+                HystrixCassandra.async( serialization.removeEdgeType( scope, node, edgeType, maxTimestamp ) );
             }
         } );
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cef3de96/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
index d7cf742..962da21 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
@@ -28,13 +28,13 @@ import java.util.UUID;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
 import org.apache.usergrid.persistence.core.rx.ObservableIterator;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.GraphFig;
 import org.apache.usergrid.persistence.graph.MarkedEdge;
 import org.apache.usergrid.persistence.graph.SearchEdgeType;
 import org.apache.usergrid.persistence.graph.exception.GraphRuntimeException;
-import org.apache.usergrid.persistence.graph.guice.StorageEdgeSerialization;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
 import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
@@ -77,7 +77,7 @@ public class NodeDeleteListenerImpl implements NodeDeleteListener {
     public NodeDeleteListenerImpl( final NodeSerialization nodeSerialization,
                                    final EdgeMetadataSerialization edgeMetadataSerialization,
                                    final EdgeMetaRepair edgeMetaRepair, final GraphFig graphFig,
-                                   @StorageEdgeSerialization final EdgeSerialization storageSerialization,
+                                   final EdgeSerialization storageSerialization,
                                    final Keyspace keyspace ) {
 
 
@@ -129,13 +129,7 @@ public class NodeDeleteListenerImpl implements NodeDeleteListener {
                                 .doOnCompleted( new Action0() {
                                     @Override
                                     public void call() {
-                                        try {
-                                            nodeSerialization.delete( scope, node, maxVersion.get() ).execute();
-                                        }
-                                        catch ( ConnectionException e ) {
-                                            throw new GraphRuntimeException( "Unable to delete marked graph node " + node,
-                                                    e );
-                                        }
+                                        HystrixCassandra.async(nodeSerialization.delete( scope, node, maxVersion.get() ));
                                     }
                                 } );
                     }
@@ -216,12 +210,7 @@ public class NodeDeleteListenerImpl implements NodeDeleteListener {
                             targetNodes.add( new TargetPair( edge.getTargetNode(), edge.getType() ) );
                         }
 
-                        try {
-                            batch.execute();
-                        }
-                        catch ( ConnectionException e ) {
-                            throw new GraphRuntimeException( "Unable to delete edges", e );
-                        }
+                        HystrixCassandra.async( batch );
 
                         //now  delete meta data
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cef3de96/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/NodeSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/NodeSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/NodeSerializationImpl.java
index 788fab4..b14fad6 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/NodeSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/NodeSerializationImpl.java
@@ -38,6 +38,7 @@ import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
 import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
 import org.apache.usergrid.persistence.core.astyanax.OrganizationScopedRowKeySerializer;
 import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
+import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
 import org.apache.usergrid.persistence.core.migration.Migration;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.util.ValidationUtils;
@@ -81,18 +82,17 @@ public class NodeSerializationImpl implements NodeSerialization, Migration {
 
 
     /**
-     * Columns are always a byte, and the entire value is contained within a row key.  This is intentional
-     * This allows us to make heavy use of Cassandra's bloom filters, as well as key caches.
-     * Since most nodes will only exist for a short amount of time in this CF, we'll most likely have them in the key
-     * cache, and we'll also bounce from the BloomFilter on read.  This means our performance will be no worse
-     * than checking a distributed cache in RAM for the existence of a marked node.
+     * Columns are always a byte, and the entire value is contained within a row key.  This is intentional This allows
+     * us to make heavy use of Cassandra's bloom filters, as well as key caches. Since most nodes will only exist for a
+     * short amount of time in this CF, we'll most likely have them in the key cache, and we'll also bounce from the
+     * BloomFilter on read.  This means our performance will be no worse than checking a distributed cache in RAM for
+     * the existence of a marked node.
      */
     private static final MultiTennantColumnFamily<ApplicationScope, Id, Boolean> GRAPH_DELETE =
             new MultiTennantColumnFamily<ApplicationScope, Id, Boolean>( "Graph_Marked_Nodes",
                     new OrganizationScopedRowKeySerializer<Id>( ROW_SERIALIZER ), BOOLEAN_SERIALIZER );
 
 
-
     protected final Keyspace keyspace;
     protected final CassandraConfig fig;
 
@@ -108,7 +108,8 @@ public class NodeSerializationImpl implements NodeSerialization, Migration {
     public Collection<MultiTennantColumnFamilyDefinition> getColumnFamilies() {
         return Collections.singleton(
                 new MultiTennantColumnFamilyDefinition( GRAPH_DELETE, BytesType.class.getSimpleName(),
-                        BooleanType.class.getSimpleName(), BytesType.class.getSimpleName(), MultiTennantColumnFamilyDefinition.CacheOption.ALL ) );
+                        BooleanType.class.getSimpleName(), BytesType.class.getSimpleName(),
+                        MultiTennantColumnFamilyDefinition.CacheOption.ALL ) );
     }
 
 
@@ -120,7 +121,7 @@ public class NodeSerializationImpl implements NodeSerialization, Migration {
 
         MutationBatch batch = keyspace.prepareMutationBatch().withConsistencyLevel( fig.getWriteCL() );
 
-        batch.withRow( GRAPH_DELETE, ScopedRowKey.fromKey( scope, node ) ).setTimestamp(timestamp )
+        batch.withRow( GRAPH_DELETE, ScopedRowKey.fromKey( scope, node ) ).setTimestamp( timestamp )
              .putColumn( COLUMN_NAME, timestamp );
 
         return batch;
@@ -147,23 +148,26 @@ public class NodeSerializationImpl implements NodeSerialization, Migration {
         ValidationUtils.validateApplicationScope( scope );
         ValidationUtils.verifyIdentity( node );
 
-        ColumnFamilyQuery<ScopedRowKey<ApplicationScope, Id>, Boolean> query = keyspace.prepareQuery( GRAPH_DELETE ).setConsistencyLevel(
-                fig.getReadCL() );
+        ColumnFamilyQuery<ScopedRowKey<ApplicationScope, Id>, Boolean> query =
+                keyspace.prepareQuery( GRAPH_DELETE ).setConsistencyLevel( fig.getReadCL() );
 
 
         try {
-            Column<Boolean> result =
-                    query.getKey( ScopedRowKey.fromKey( scope, node ) ).getColumn( COLUMN_NAME ).execute().getResult();
+            Column<Boolean> result = HystrixCassandra
+                    .user( query.getKey( ScopedRowKey.fromKey( scope, node ) ).getColumn( COLUMN_NAME ) )
+                    .getResult();
 
             return Optional.of( result.getLongValue() );
         }
-        catch ( NotFoundException e ) {
-            //swallow, there's just no column
-            return Optional.absent();
-        }
-        catch ( ConnectionException e ) {
-            throw new GraphRuntimeException( "Unable to connect to casandra", e );
+        catch (RuntimeException re ) {
+            if(re.getCause().getCause() instanceof   NotFoundException) {
+                //swallow, there's just no column
+                return Optional.absent();
+            }
+
+            throw re;
         }
+
     }
 
 
@@ -173,36 +177,34 @@ public class NodeSerializationImpl implements NodeSerialization, Migration {
         Preconditions.checkNotNull( edges, "edges cannot be null" );
 
 
-        final ColumnFamilyQuery<ScopedRowKey<ApplicationScope, Id>, Boolean> query = keyspace.prepareQuery( GRAPH_DELETE ).setConsistencyLevel( fig.getReadCL() );
+        final ColumnFamilyQuery<ScopedRowKey<ApplicationScope, Id>, Boolean> query =
+                keyspace.prepareQuery( GRAPH_DELETE ).setConsistencyLevel( fig.getReadCL() );
 
 
-        final List<ScopedRowKey<ApplicationScope, Id>> keys = new ArrayList<ScopedRowKey<ApplicationScope, Id>>(edges.size());
+        final List<ScopedRowKey<ApplicationScope, Id>> keys =
+                new ArrayList<ScopedRowKey<ApplicationScope, Id>>( edges.size() );
 
         //worst case all are marked
-        final Map<Id, Long> versions = new HashMap<>(edges.size());
+        final Map<Id, Long> versions = new HashMap<>( edges.size() );
 
-        for(final Edge edge: edges){
+        for ( final Edge edge : edges ) {
             keys.add( ScopedRowKey.fromKey( scope, edge.getSourceNode() ) );
             keys.add( ScopedRowKey.fromKey( scope, edge.getTargetNode() ) );
         }
 
-        try {
-            final Rows<ScopedRowKey<ApplicationScope, Id>, Boolean>
-                    results = query.getRowSlice( keys ).withColumnSlice( Collections.singletonList( COLUMN_NAME ) ).execute().getResult();
-
-            for(Row<ScopedRowKey<ApplicationScope, Id>, Boolean> row: results){
-                Column<Boolean> column = row.getColumns().getColumnByName( COLUMN_NAME );
 
-                if(column != null){
-                    versions.put( row.getKey().getKey(), column.getLongValue() );
-                }
+        final Rows<ScopedRowKey<ApplicationScope, Id>, Boolean> results = HystrixCassandra
+                .user( query.getRowSlice( keys ).withColumnSlice( Collections.singletonList( COLUMN_NAME ) ) )
+                .getResult();
 
+        for ( Row<ScopedRowKey<ApplicationScope, Id>, Boolean> row : results ) {
+            Column<Boolean> column = row.getColumns().getColumnByName( COLUMN_NAME );
 
+            if ( column != null ) {
+                versions.put( row.getKey().getKey(), column.getLongValue() );
             }
         }
-        catch ( ConnectionException e ) {
-            throw new GraphRuntimeException( "Unable to execute multiget for all max versions", e );
-        }
+
 
         return versions;
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cef3de96/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java
index 6b99e93..d3d92ea 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java
@@ -35,6 +35,7 @@ import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
 import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
 import org.apache.usergrid.persistence.core.astyanax.OrganizationScopedRowKeySerializer;
 import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
+import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
 import org.apache.usergrid.persistence.graph.GraphFig;
@@ -119,18 +120,20 @@ public class NodeShardCounterSerializationImpl implements NodeShardCounterSerial
 
 
         try {
-            OperationResult<Column<Boolean>> column =
-                    keyspace.prepareQuery( EDGE_SHARD_COUNTS ).getKey( rowKey ).getColumn( true ).execute();
+            OperationResult<Column<Boolean>> column = HystrixCassandra.user(
+                    keyspace.prepareQuery( EDGE_SHARD_COUNTS ).getKey( rowKey ).getColumn( true ) );
 
             return column.getResult().getLongValue();
         }
         //column not found, return 0
-        catch ( NotFoundException nfe ) {
-            return 0;
-        }
-        catch ( ConnectionException e ) {
-            throw new GraphRuntimeException( "An error occurred connecting to cassandra", e );
+        catch ( RuntimeException re ) {
+            if(re.getCause() instanceof NotFoundException) {
+                return 0;
+            }
+
+            throw  re;
         }
+
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cef3de96/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
index 3c3b99c..6c79814 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
@@ -27,6 +27,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.persistence.core.consistency.TimeService;
+import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.util.ValidationUtils;
 import org.apache.usergrid.persistence.graph.GraphFig;
@@ -45,6 +46,7 @@ import org.apache.usergrid.persistence.graph.serialization.util.GraphValidation;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
+import com.netflix.astyanax.MutationBatch;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
 
@@ -56,7 +58,7 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
 
     private static final Logger LOG = LoggerFactory.getLogger( NodeShardAllocationImpl.class );
 
-    private static final Shard MIN_SHARD = new Shard(0, 0, true);
+    private static final Shard MIN_SHARD = new Shard( 0, 0, true );
 
     private final EdgeShardSerialization edgeShardSerialization;
     private final EdgeColumnFamilies edgeColumnFamilies;
@@ -82,32 +84,29 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
 
 
     @Override
-    public Iterator<ShardEntryGroup> getShards( final ApplicationScope scope,
-                                                final Optional<Shard> maxShardId, final DirectedEdgeMeta directedEdgeMeta ) {
+    public Iterator<ShardEntryGroup> getShards( final ApplicationScope scope, final Optional<Shard> maxShardId,
+                                                final DirectedEdgeMeta directedEdgeMeta ) {
 
-        ValidationUtils.validateApplicationScope(scope);
+        ValidationUtils.validateApplicationScope( scope );
         Preconditions.checkNotNull( maxShardId, "maxShardId cannot be null" );
         GraphValidation.validateDirectedEdgeMeta( directedEdgeMeta );
 
         Iterator<Shard> existingShards;
 
         //its a new node, it doesn't need to check cassandra, it won't exist
-        if(isNewNode(directedEdgeMeta)){
+        if ( isNewNode( directedEdgeMeta ) ) {
             existingShards = Collections.singleton( MIN_SHARD ).iterator();
         }
 
-        else{
+        else {
             existingShards = edgeShardSerialization.getShardMetaData( scope, maxShardId, directedEdgeMeta );
         }
 
-        if(existingShards == null || !existingShards.hasNext()){
+        if ( existingShards == null || !existingShards.hasNext() ) {
 
-            try {
-                edgeShardSerialization.writeShardMeta( scope, MIN_SHARD, directedEdgeMeta ).execute();
-            }
-            catch ( ConnectionException e ) {
-                throw new GraphRuntimeException( "Unable to allocate minimum shard" );
-            }
+
+            final MutationBatch batch = edgeShardSerialization.writeShardMeta( scope, MIN_SHARD, directedEdgeMeta );
+            HystrixCassandra.user( batch  );
 
             existingShards = Collections.singleton( MIN_SHARD ).iterator();
         }
@@ -117,9 +116,10 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
 
 
     @Override
-    public boolean auditShard( final ApplicationScope scope, final ShardEntryGroup shardEntryGroup, final DirectedEdgeMeta directedEdgeMeta) {
+    public boolean auditShard( final ApplicationScope scope, final ShardEntryGroup shardEntryGroup,
+                               final DirectedEdgeMeta directedEdgeMeta ) {
 
-        ValidationUtils.validateApplicationScope(scope);
+        ValidationUtils.validateApplicationScope( scope );
         GraphValidation.validateShardEntryGroup( shardEntryGroup );
         GraphValidation.validateDirectedEdgeMeta( directedEdgeMeta );
 
@@ -129,12 +129,12 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
         /**
          * Nothing to do, it's been created very recently, we don't create a new one
          */
-        if (shardEntryGroup.isCompactionPending()) {
+        if ( shardEntryGroup.isCompactionPending() ) {
             return false;
         }
 
         //we can't allocate, we have more than 1 write shard currently.  We need to compact first
-        if(shardEntryGroup.entrySize() != 1){
+        if ( shardEntryGroup.entrySize() != 1 ) {
             return false;
         }
 
@@ -145,7 +145,7 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
         final Shard shard = shardEntryGroup.getMinShard();
 
 
-        if (shard.getCreatedTime() >= getMinTime()){
+        if ( shard.getCreatedTime() >= getMinTime() ) {
             return false;
         }
 
@@ -154,8 +154,7 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
          * Check out if we have a count for our shard allocation
          */
 
-        final long count =
-                nodeShardApproximation.getCount( scope, shard, directedEdgeMeta);
+        final long count = nodeShardApproximation.getCount( scope, shard, directedEdgeMeta );
 
 
         if ( count < graphFig.getShardSize() ) {
@@ -163,13 +162,12 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
         }
 
 
-
-
         /**
          * Allocate the shard
          */
 
-        final Iterator<MarkedEdge> edges  = directedEdgeMeta.loadEdges( shardedEdgeSerialization, edgeColumnFamilies, scope, shardEntryGroup, Long.MAX_VALUE );
+        final Iterator<MarkedEdge> edges = directedEdgeMeta
+                .loadEdges( shardedEdgeSerialization, edgeColumnFamilies, scope, shardEntryGroup, Long.MAX_VALUE );
 
 
         if ( !edges.hasNext() ) {
@@ -184,17 +182,12 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
 
         final long createTimestamp = timeService.getCurrentTime();
 
-        final Shard newShard = new Shard(marked.getTimestamp(), createTimestamp, false);
+        final Shard newShard = new Shard( marked.getTimestamp(), createTimestamp, false );
 
 
-        try {
-            this.edgeShardSerialization
-                    .writeShardMeta( scope, newShard, directedEdgeMeta )
-                    .execute();
-        }
-        catch ( ConnectionException e ) {
-            throw new GraphRuntimeException( "Unable to write the new edge metadata" );
-        }
+        final MutationBatch batch = this.edgeShardSerialization.writeShardMeta( scope, newShard, directedEdgeMeta );
+
+        HystrixCassandra.user( batch );
 
 
         return true;
@@ -222,28 +215,22 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
     /**
      * Return true if the node has been created within our timeout.  If this is the case, we dont' need to check
      * cassandra, we know it won't exist
-     *
-     * @param directedEdgeMeta
-     * @return
      */
-    private boolean isNewNode(DirectedEdgeMeta directedEdgeMeta){
+    private boolean isNewNode( DirectedEdgeMeta directedEdgeMeta ) {
 
         /**
          * the max time in microseconds we can allow
          */
-        final long maxTime = (timeService.getCurrentTime() + graphFig.getShardCacheTimeout() )* 10000;
+        final long maxTime = ( timeService.getCurrentTime() + graphFig.getShardCacheTimeout() ) * 10000;
 
-        for(DirectedEdgeMeta.NodeMeta node: directedEdgeMeta.getNodes()){
+        for ( DirectedEdgeMeta.NodeMeta node : directedEdgeMeta.getNodes() ) {
             final long uuidTime = node.getId().getUuid().timestamp();
 
-            if(uuidTime < maxTime){
+            if ( uuidTime < maxTime ) {
                 return true;
             }
         }
 
         return false;
-
     }
-
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cef3de96/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
new file mode 100644
index 0000000..8f5171b
--- /dev/null
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
@@ -0,0 +1,337 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+package org.apache.usergrid.persistence.graph;
+
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runners.model.InitializationError;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.thrift.Cassandra;
+import org.apache.commons.lang.time.StopWatch;
+
+import org.apache.usergrid.persistence.core.cassandra.CassandraRule;
+import org.apache.usergrid.persistence.core.cassandra.ITRunner;
+import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
+import org.apache.usergrid.persistence.core.migration.MigrationException;
+import org.apache.usergrid.persistence.core.migration.MigrationManager;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
+import org.apache.usergrid.persistence.graph.guice.TestGraphModule;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardAllocation;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Optional;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+import rx.Observable;
+import rx.Subscriber;
+
+import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createEdge;
+import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createId;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+
+@Ignore("A stress test, not part of functional testing")
+public class GraphManagerShardConsistencyIT {
+    private static final Logger log = LoggerFactory.getLogger( GraphManagerShardConsistencyIT.class );
+
+
+    @ClassRule
+    public static CassandraRule rule = new CassandraRule();
+
+
+    protected ApplicationScope scope;
+
+    protected int numWorkers;
+
+    @Before
+    public void setupOrg() {
+
+
+
+        //get the system property of the UUID to use.  If one is not set, use the defualt
+        String uuidString = System.getProperty( "org.id", "80a42760-b699-11e3-a5e2-0800200c9a66" );
+
+        scope = new ApplicationScopeImpl( createId( UUID.fromString( uuidString ), "test" ) );
+
+        numWorkers = Integer.parseInt( System.getProperty( "numWorkers", "4" ) );
+//        readCount = Integer.parseInt( System.getProperty( "readCount", "20000" ) );
+    }
+
+
+//    @Test
+//    public void writeThousandsSingleSource() throws InterruptedException, ExecutionException {
+//        EdgeGenerator generator = new EdgeGenerator() {
+//
+//            private Id sourceId = createId( "source" );
+//
+//
+//            @Override
+//            public Edge newEdge() {
+//                Edge edge = createEdge( sourceId, "test", createId( "target" ) );
+//
+//
+//                return edge;
+//            }
+//
+//
+//            @Override
+//            public Observable<Edge> doSearch( final GraphManager manager ) {
+//                return manager.loadEdgesFromSource( new SimpleSearchByEdgeType( sourceId, "test", System.currentTimeMillis(), null ) );
+//            }
+//        };
+//
+//
+//
+//        doTest( generator );
+//    }
+
+
+    @Test
+    public void writeThousandsSingleTarget() throws InterruptedException, ExecutionException, MigrationException {
+
+        final Id targetId = createId("target");
+        final String edgeType = "test";
+
+        EdgeGenerator generator = new EdgeGenerator() {
+
+
+            @Override
+            public Edge newEdge() {
+                Edge edge = createEdge( createId( "source" ), edgeType,  targetId );
+
+
+                return edge;
+            }
+
+
+            @Override
+            public Observable<Edge> doSearch( final GraphManager manager ) {
+                return manager.loadEdgesToTarget( new SimpleSearchByEdgeType( targetId, "test", System.currentTimeMillis(), null ) );
+            }
+        };
+
+
+        /**
+         * create 3 injectors.  This way all the caches are independent of one another.  This is the same as
+         * multiple nodes
+         */
+        final List<Injector> injectors = createInjectors(3);
+
+
+        final GraphFig graphFig = getInstance( injectors, GraphFig.class );
+
+        final long shardSize =  graphFig.getShardSize();
+
+        /**
+         * Do 4x shard size so we should have approximately 4 shards
+         */
+        final long numberOfEdges =  shardSize * 4;
+
+        final long countPerWorker = numberOfEdges/numWorkers;
+
+        final long writeLimit = countPerWorker;
+
+
+//        HystrixCassandra.ASYNC_GROUP.
+
+
+
+
+
+        final List<Future<Boolean>> futures = new ArrayList<>();
+
+        for(Injector injector: injectors) {
+            final GraphManagerFactory gmf = injector.getInstance( GraphManagerFactory.class );
+
+            futures.addAll( doTest( gmf, generator, writeLimit ) );
+        }
+
+        for(Future<Boolean> future: futures){
+            future.get();
+        }
+
+        //now get all our shards
+        final NodeShardAllocation allocation = getInstance( injectors, NodeShardAllocation.class );
+
+        final DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromTargetNode( targetId, edgeType );
+
+        int count = 0;
+
+        while(count < 4) {
+
+            //reset our count.  Ultimately we'll have 4 groups once our compaction completes
+            count = 0;
+
+            final Iterator<ShardEntryGroup> groups = allocation.getShards( scope, Optional.<Shard>absent(), directedEdgeMeta );
+
+            while(groups.hasNext()){
+                final ShardEntryGroup group = groups.next();
+
+                log.info( "Compaction pending status for group {} is {}", group, group.isCompactionPending() );
+
+                count++;
+
+            }
+        }
+
+
+
+
+    }
+
+    private <T> T getInstance(final List<Injector> injectors, Class<T> clazz ){
+        return injectors.get( 0 ).getInstance( clazz );
+    }
+
+
+    /**
+     * Create new Guice injector environments and return them
+     * @param count
+     */
+    private List<Injector> createInjectors( int count ) throws MigrationException {
+
+        final List<Injector> injectors = new ArrayList<>(count);
+
+        for(int i = 0; i < count; i++){
+            final Injector injector = Guice.createInjector( new TestGraphModule() );
+            injectors.add( injector );
+        }
+
+
+        final MigrationManager migrationManager = getInstance( injectors, MigrationManager.class );
+
+        migrationManager.migrate();
+
+        return injectors;
+
+
+    }
+
+    /**
+     * Execute the test with the generator
+     */
+    private List<Future<Boolean>> doTest(final GraphManagerFactory factory, final EdgeGenerator generator, final long writeCount ) throws InterruptedException, ExecutionException {
+
+        ExecutorService executor = Executors.newFixedThreadPool( numWorkers );
+
+        List<Future<Boolean>> futures = new ArrayList<>( numWorkers );
+
+        for ( int i = 0; i < numWorkers; i++ ) {
+            Future<Boolean> future = executor.submit( new Worker(factory, generator, writeCount ) );
+
+            futures.add( future );
+        }
+
+
+        return futures;
+    }
+
+
+    private class Worker implements Callable<Boolean> {
+        private final GraphManagerFactory factory;
+        private final EdgeGenerator generator;
+        private final long writeLimit;
+
+
+        private Worker( final GraphManagerFactory factory, final EdgeGenerator generator, final long writeLimit) {
+            this.factory = factory;
+            this.generator = generator;
+            this.writeLimit = writeLimit;
+        }
+
+
+        @Override
+        public Boolean call() throws Exception {
+            GraphManager manager = factory.createEdgeManager( scope );
+
+
+            final StopWatch timer = new StopWatch();
+            timer.start();
+
+            for ( long i = 0; i < writeLimit; i++ ) {
+
+                Edge edge = generator.newEdge();
+
+                Edge returned = manager.writeEdge( edge ).toBlocking().last();
+
+
+                assertNotNull( "Returned has a version", returned.getTimestamp() );
+
+
+                if ( i % 1000 == 0 ) {
+                    log.info( "   Wrote: " + i );
+                }
+            }
+
+            timer.stop();
+            log.info( "Total time to write {} entries {} ms", writeLimit, timer.getTime() );
+
+            return true;
+        }
+    }
+
+
+    private interface EdgeGenerator {
+
+        /**
+         * Create a new edge to persiste
+         */
+        public Edge newEdge();
+
+        /**
+         * Perform the search returning an observable edge
+         * @param manager
+         * @return
+         */
+        public Observable<Edge> doSearch( final GraphManager manager );
+    }
+
+
+}
+
+
+
+
+

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cef3de96/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/StorageGraphManagerIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/StorageGraphManagerIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/StorageGraphManagerIT.java
index 3c71ae8..27f8479 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/StorageGraphManagerIT.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/StorageGraphManagerIT.java
@@ -86,7 +86,7 @@ public class StorageGraphManagerIT extends GraphManagerIT {
                 };
 
         gmi.setEdgeDeleteSubcriber(subscriber );
-        gmi.setEdgeWriteSubcriber( subscriber );
+//        gmi.setEdgeWriteSubcriber( subscriber );
         gmi.setNodeDelete( subscriber );
 
         return helper;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cef3de96/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListenerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListenerTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListenerTest.java
index 3bcbfd0..1f6df4c 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListenerTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListenerTest.java
@@ -38,7 +38,6 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.GraphFig;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
 import org.apache.usergrid.persistence.graph.MarkedEdge;
-import org.apache.usergrid.persistence.graph.guice.StorageEdgeSerialization;
 import org.apache.usergrid.persistence.graph.guice.TestGraphModule;
 import org.apache.usergrid.persistence.graph.impl.stage.EdgeDeleteListener;
 import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
@@ -80,7 +79,6 @@ public class EdgeDeleteListenerTest {
 
 
     @Inject
-    @StorageEdgeSerialization
     protected EdgeSerialization storageEdgeSerialization;
 
 
@@ -220,7 +218,7 @@ public class EdgeDeleteListenerTest {
 
         final UUID foobar = UUIDGenerator.newTimeUUID();
 
-           storageEdgeSerialization.writeEdge( scope, edgeV1, foobar ).execute();
+        storageEdgeSerialization.writeEdge( scope, edgeV1, foobar ).execute();
         storageEdgeSerialization.writeEdge( scope, edgeV2, foobar ).execute();
         storageEdgeSerialization.writeEdge( scope, edgeV3, foobar ).execute();
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cef3de96/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
index f2a8cee..3b66a31 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
@@ -39,7 +39,6 @@ import org.apache.usergrid.persistence.graph.GraphFig;
 import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
 import org.apache.usergrid.persistence.graph.MarkedEdge;
-import org.apache.usergrid.persistence.graph.guice.StorageEdgeSerialization;
 import org.apache.usergrid.persistence.graph.guice.TestGraphModule;
 import org.apache.usergrid.persistence.graph.impl.stage.NodeDeleteListener;
 import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
@@ -83,7 +82,6 @@ public class NodeDeleteListenerTest {
     protected NodeDeleteListener deleteListener;
 
     @Inject
-    @StorageEdgeSerialization
     protected EdgeSerialization edgeSerialization;
 
     @Inject

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cef3de96/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairTest.java
index 91767b0..4b62ad1 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairTest.java
@@ -34,7 +34,6 @@ import org.apache.usergrid.persistence.collection.guice.MigrationManagerRule;
 import org.apache.usergrid.persistence.core.cassandra.ITRunner;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.MarkedEdge;
-import org.apache.usergrid.persistence.graph.guice.StorageEdgeSerialization;
 import org.apache.usergrid.persistence.graph.guice.TestGraphModule;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
 import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
@@ -71,7 +70,6 @@ public class EdgeDeleteRepairTest {
 
 
     @Inject
-    @StorageEdgeSerialization
     protected EdgeSerialization storageEdgeSerialization;
 
     @Inject

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cef3de96/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairTest.java
index 6be02c4..f14e007 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairTest.java
@@ -35,7 +35,6 @@ import org.apache.usergrid.persistence.core.cassandra.ITRunner;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.GraphFig;
 import org.apache.usergrid.persistence.graph.MarkedEdge;
-import org.apache.usergrid.persistence.graph.guice.StorageEdgeSerialization;
 import org.apache.usergrid.persistence.graph.guice.TestGraphModule;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchIdType;
@@ -73,7 +72,6 @@ public class EdgeMetaRepairTest {
     protected EdgeMetaRepair edgeMetaRepair;
 
     @Inject
-    @StorageEdgeSerialization
     protected EdgeSerialization storageEdgeSerialization;
 
     @Inject

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cef3de96/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/EdgeSerializationChopTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/EdgeSerializationChopTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/EdgeSerializationChopTest.java
index 6da3506..54eec6b 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/EdgeSerializationChopTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/EdgeSerializationChopTest.java
@@ -32,7 +32,6 @@ import org.apache.usergrid.persistence.collection.guice.MigrationManagerRule;
 import org.apache.usergrid.persistence.core.cassandra.ITRunner;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.MarkedEdge;
-import org.apache.usergrid.persistence.graph.guice.StorageEdgeSerialization;
 import org.apache.usergrid.persistence.graph.guice.TestGraphModule;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
@@ -64,7 +63,6 @@ public class EdgeSerializationChopTest {
 
 
     @Inject
-    @StorageEdgeSerialization
     protected EdgeSerialization serialization;
 
     protected ApplicationScope scope;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cef3de96/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/PermanentSerializationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/PermanentSerializationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/PermanentSerializationTest.java
index 2ff571b..caf1833 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/PermanentSerializationTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/PermanentSerializationTest.java
@@ -24,7 +24,6 @@ import org.jukito.UseModules;
 import org.junit.runner.RunWith;
 
 import org.apache.usergrid.persistence.core.cassandra.ITRunner;
-import org.apache.usergrid.persistence.graph.guice.StorageEdgeSerialization;
 import org.apache.usergrid.persistence.graph.guice.TestGraphModule;
 
 import com.google.inject.Inject;
@@ -35,7 +34,6 @@ import com.google.inject.Inject;
 public class PermanentSerializationTest extends EdgeSerializationTest {
 
     @Inject
-    @StorageEdgeSerialization
     protected EdgeSerialization edgeSerialization;
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cef3de96/stack/corepersistence/graph/src/test/resources/usergrid-UNIT.properties
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/resources/usergrid-UNIT.properties b/stack/corepersistence/graph/src/test/resources/usergrid-UNIT.properties
index fd24f43..61612f6 100644
--- a/stack/corepersistence/graph/src/test/resources/usergrid-UNIT.properties
+++ b/stack/corepersistence/graph/src/test/resources/usergrid-UNIT.properties
@@ -6,6 +6,7 @@ cassandra.hosts=localhost
 cassandra.cluster_name=Usergrid
 collections.keyspace=Usergrid_Collections
 cassandra.timeout=5000
+#cassandra.embedded=true
 
 
 collections.keyspace.strategy.options=replication_factor:1

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cef3de96/stack/corepersistence/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/pom.xml b/stack/corepersistence/pom.xml
index 9565543..a9223f9 100644
--- a/stack/corepersistence/pom.xml
+++ b/stack/corepersistence/pom.xml
@@ -48,7 +48,7 @@
         <guava.version>15.0</guava.version>
         <guice.version>3.0</guice.version>
         <guicyfig.version>3.2</guicyfig.version>
-        <hystrix.version>1.3.15</hystrix.version>
+        <hystrix.version>1.3.16</hystrix.version>
         <jackson-2-version>2.3.1</jackson-2-version>
         <jackson-smile.verson>1.9.9</jackson-smile.verson>
         <jukito.version>1.4-UG</jukito.version>


[3/3] git commit: Finished refactor of low level serialization to remove impedance mismatch of apis between read and write.

Posted by to...@apache.org.
Finished refactor of low level serialization to remove impedance mismatch of apis between read and write.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/c69c1974
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/c69c1974
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/c69c1974

Branch: refs/heads/USERGRID-188
Commit: c69c1974ade1a0a580ccf633f6a1cb8dc14dd1f0
Parents: cef3de9
Author: Todd Nine <to...@apache.org>
Authored: Fri Aug 15 16:49:25 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Mon Aug 18 13:41:31 2014 -0600

----------------------------------------------------------------------
 .../core/astyanax/ColumnNameIterator.java       |  24 +-
 .../core/hystrix/HystrixCassandra.java          |  10 +-
 stack/corepersistence/graph/pom.xml             |   7 +
 .../usergrid/persistence/graph/GraphFig.java    |  13 +
 .../graph/impl/CollectionIndexObserver.java     |   2 +-
 .../impl/EdgeSerializationImpl.java             | 256 ++++++-
 .../impl/shard/DirectedEdgeMeta.java            |  65 +-
 .../impl/shard/ShardEntryGroup.java             |  16 +-
 .../impl/shard/ShardGroupCompaction.java        |  39 +-
 .../impl/shard/ShardedEdgeSerialization.java    |  91 ++-
 .../NodeShardCounterSerializationImpl.java      |   2 +-
 .../impl/shard/impl/EdgeSearcher.java           |  32 +-
 .../shard/impl/NodeShardAllocationImpl.java     |  17 +-
 .../impl/shard/impl/NodeShardCacheImpl.java     | 127 ++--
 .../shard/impl/ShardGroupColumnIterator.java    | 130 ++++
 .../shard/impl/ShardGroupCompactionImpl.java    | 242 ++++++-
 .../impl/shard/impl/ShardRowIterator.java       | 134 ----
 .../impl/ShardedEdgeSerializationImpl.java      | 707 ++++++++++++++-----
 .../impl/shard/impl/ShardsColumnIterator.java   | 128 ++++
 .../graph/GraphManagerShardConsistencyIT.java   | 126 +++-
 .../impl/shard/NodeShardAllocationTest.java     |   2 +-
 .../shard/count/NodeShardApproximationTest.java |  12 +
 .../test/resources/usergrid-SHARD.properties    |  23 +
 23 files changed, 1682 insertions(+), 523 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c69c1974/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIterator.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIterator.java
index 050157a..af4e1f9 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIterator.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIterator.java
@@ -40,20 +40,15 @@ public class ColumnNameIterator<C, T> implements Iterable<T>, Iterator<T> {
 
     private final RowQuery<?, C> rowQuery;
     private final ColumnParser<C, T> parser;
+    private final boolean skipFirst;
 
     private Iterator<Column<C>> sourceIterator;
 
 
-    public ColumnNameIterator( RowQuery<?, C> rowQuery, final ColumnParser<C, T> parser, final boolean skipFirst) {
+    public ColumnNameIterator( RowQuery<?, C> rowQuery, final ColumnParser<C, T> parser, final boolean skipFirst ) {
         this.rowQuery = rowQuery.autoPaginate( true );
         this.parser = parser;
-
-        advanceIterator();
-
-        //if we are to skip the first element, we need to advance the iterator
-        if ( skipFirst && sourceIterator.hasNext() ) {
-            sourceIterator.next();
-        }
+        this.skipFirst = skipFirst;
     }
 
 
@@ -65,6 +60,19 @@ public class ColumnNameIterator<C, T> implements Iterable<T>, Iterator<T> {
 
     @Override
     public boolean hasNext() {
+
+        if ( sourceIterator == null ) {
+            advanceIterator();
+
+
+            //if we are to skip the first element, we need to advance the iterator
+            if ( skipFirst && sourceIterator.hasNext() ) {
+                sourceIterator.next();
+            }
+
+            return sourceIterator.hasNext();
+        }
+
         //if we've exhausted this iterator, try to advance to the next set
         if ( sourceIterator.hasNext() ) {
             return true;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c69c1974/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/hystrix/HystrixCassandra.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/hystrix/HystrixCassandra.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/hystrix/HystrixCassandra.java
index 7758651..356850e 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/hystrix/HystrixCassandra.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/hystrix/HystrixCassandra.java
@@ -42,20 +42,22 @@ public class HystrixCassandra {
     /**
      * Command group used for realtime user commands
      */
-    public static final HystrixCommandGroupKey USER_GROUP = HystrixCommandGroupKey.Factory.asKey( "user" );
+    public static final HystrixCommand.Setter
+            USER_GROUP = HystrixCommand.Setter.withGroupKey(   HystrixCommandGroupKey.Factory.asKey( "user" ) ).andThreadPoolPropertiesDefaults(
+            HystrixThreadPoolProperties.Setter().withCoreSize( 100 ) );
 
     /**
      * Command group for asynchronous operations
      */
-    public static final HystrixCommandGroupKey ASYNC_GROUP = HystrixCommandGroupKey.Factory.asKey( "async" );
+    public static final HystrixCommand.Setter
+            ASYNC_GROUP = HystrixCommand.Setter.withGroupKey( HystrixCommandGroupKey.Factory.asKey( "async" ) ).andThreadPoolPropertiesDefaults(
+            HystrixThreadPoolProperties.Setter().withCoreSize( 50 ) );
 
 
     /**
      * Execute an user operation
      */
     public static <R> OperationResult<R> user( final Execution<R> execution) {
-
-
         return new HystrixCommand<OperationResult<R>>( USER_GROUP ) {
 
             @Override

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c69c1974/stack/corepersistence/graph/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/pom.xml b/stack/corepersistence/graph/pom.xml
index eee82fd..8174a56 100644
--- a/stack/corepersistence/graph/pom.xml
+++ b/stack/corepersistence/graph/pom.xml
@@ -72,6 +72,13 @@
       <scope>test</scope>
     </dependency>
 
+    <dependency>
+        <groupId>com.codahale.metrics</groupId>
+        <artifactId>metrics-core</artifactId>
+        <version>3.0.2</version>
+        <scope>test</scope>
+    </dependency>
+
 
 
   </dependencies>

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c69c1974/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
index 45064c9..d434db7 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
@@ -45,6 +45,12 @@ public interface GraphFig extends GuicyFig {
      */
     public static final String SHARD_CACHE_TIMEOUT = "usergrid.graph.shard.cache.timeout";
 
+    public static final String SHARD_CACHE_REFRESH_WORKERS = "usergrid.graph.shard.refresh.worker.count";
+
+    public static final String SHARD_REPAIR_CHANCE = "usergrid.graph.shard.repair.chance";
+
+
+
     /**
      * The minimum amount of time than can occur (in millis) between shard allocation.  Must be at least 2x the cache timeout.
      *
@@ -74,6 +80,10 @@ public interface GraphFig extends GuicyFig {
     int getRepairConcurrentSize();
 
 
+    @Default( ".10" )
+    @Key( SHARD_REPAIR_CHANCE  )
+    double getShardRepairChance();
+
 
     @Default("500000")
     @Key(SHARD_SIZE)
@@ -94,6 +104,9 @@ public interface GraphFig extends GuicyFig {
     long getShardCacheSize();
 
 
+    @Default( "2" )
+    @Key( SHARD_CACHE_REFRESH_WORKERS )
+    int getShardCacheRefreshWorkerCount();
 
 
     @Default( "10000" )

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c69c1974/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/CollectionIndexObserver.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/CollectionIndexObserver.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/CollectionIndexObserver.java
index a4a7a70..d792463 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/CollectionIndexObserver.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/CollectionIndexObserver.java
@@ -61,7 +61,7 @@ public class CollectionIndexObserver{
 //
 //        //entity exists, write the edge
 //        if(entity.getEntity().isPresent()){
-//            em.writeEdge( edge ).toBlocking().last();
+//            em.writeEdgeFromSource( edge ).toBlocking().last();
 //        }
 //        //entity does not exist, it's been removed, mark the edge
 //        else{

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c69c1974/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
index d3f29a0..4c1ae79 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
@@ -20,12 +20,14 @@
 package org.apache.usergrid.persistence.graph.serialization.impl;
 
 
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.UUID;
 
 import javax.inject.Inject;
 
 import org.apache.usergrid.persistence.core.astyanax.CassandraConfig;
+import org.apache.usergrid.persistence.core.consistency.TimeService;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.util.ValidationUtils;
 import org.apache.usergrid.persistence.graph.GraphFig;
@@ -37,8 +39,10 @@ import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeColumnFamilies;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardStrategy;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardedEdgeSerialization;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardGroupColumnIterator;
 import org.apache.usergrid.persistence.graph.serialization.util.GraphValidation;
 import org.apache.usergrid.persistence.model.entity.Id;
 
@@ -53,7 +57,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
  * Serialization for edges.  Delegates partitioning to the sharding strategy.
  */
 @Singleton
-public class EdgeSerializationImpl implements EdgeSerialization  {
+public class EdgeSerializationImpl implements EdgeSerialization {
 
 
     protected final Keyspace keyspace;
@@ -62,18 +66,23 @@ public class EdgeSerializationImpl implements EdgeSerialization  {
     protected final EdgeShardStrategy edgeShardStrategy;
     protected final EdgeColumnFamilies edgeColumnFamilies;
     protected final ShardedEdgeSerialization shardedEdgeSerialization;
+    protected final TimeService timeService;
 
 
     @Inject
     public EdgeSerializationImpl( final Keyspace keyspace, final CassandraConfig cassandraConfig,
                                   final GraphFig graphFig, final EdgeShardStrategy edgeShardStrategy,
                                   final EdgeColumnFamilies edgeColumnFamilies,
-                                  final ShardedEdgeSerialization shardedEdgeSerialization ) {
+                                  final ShardedEdgeSerialization shardedEdgeSerialization,
+                                  final TimeService timeService ) {
 
 
-        checkNotNull( "keyspace required", keyspace );
-        checkNotNull( "cassandraConfig required", cassandraConfig );
-        checkNotNull( "consistencyFig required", graphFig );
+        checkNotNull( keyspace, "keyspace required" );
+        checkNotNull( cassandraConfig, "cassandraConfig required" );
+        checkNotNull( edgeShardStrategy, "edgeShardStrategy required" );
+        checkNotNull( edgeColumnFamilies, "edgeColumnFamilies required" );
+        checkNotNull( shardedEdgeSerialization, "shardedEdgeSerialization required" );
+        checkNotNull( timeService, "timeService required" );
 
 
         this.keyspace = keyspace;
@@ -82,18 +91,186 @@ public class EdgeSerializationImpl implements EdgeSerialization  {
         this.edgeShardStrategy = edgeShardStrategy;
         this.edgeColumnFamilies = edgeColumnFamilies;
         this.shardedEdgeSerialization = shardedEdgeSerialization;
+        this.timeService = timeService;
     }
 
 
     @Override
     public MutationBatch writeEdge( final ApplicationScope scope, final MarkedEdge markedEdge, final UUID timestamp ) {
-        return shardedEdgeSerialization.writeEdge( edgeColumnFamilies, scope, markedEdge, timestamp );
+
+        ValidationUtils.validateApplicationScope( scope );
+        GraphValidation.validateEdge( markedEdge );
+        ValidationUtils.verifyTimeUuid( timestamp, "timestamp" );
+
+        final long now = timeService.getCurrentTime();
+        final Id sourceNode = markedEdge.getSourceNode();
+        final Id targetNode = markedEdge.getTargetNode();
+        final String edgeType = markedEdge.getType();
+        final long edgeTimestamp = markedEdge.getTimestamp();
+
+        /**
+         * Source write
+         */
+        final DirectedEdgeMeta sourceEdgeMeta = DirectedEdgeMeta.fromSourceNode( sourceNode, edgeType );
+
+        final Collection<Shard> sourceWriteShards =
+                edgeShardStrategy.getWriteShards( scope, edgeTimestamp, sourceEdgeMeta ).getWriteShards( now );
+
+        final MutationBatch batch = shardedEdgeSerialization
+                .writeEdgeFromSource( edgeColumnFamilies, scope, markedEdge, sourceWriteShards, sourceEdgeMeta,
+                        timestamp );
+
+
+        /**
+         * Source with target  type write
+         */
+        final DirectedEdgeMeta sourceTargetTypeEdgeMeta =
+                DirectedEdgeMeta.fromSourceNodeTargetType( sourceNode, edgeType, targetNode.getType() );
+
+        final Collection<Shard> sourceTargetTypeWriteShards =
+                edgeShardStrategy.getWriteShards( scope, edgeTimestamp, sourceTargetTypeEdgeMeta )
+                                 .getWriteShards( now );
+
+        batch.mergeShallow( shardedEdgeSerialization
+                .writeEdgeFromSourceWithTargetType( edgeColumnFamilies, scope, markedEdge, sourceTargetTypeWriteShards,
+                        sourceTargetTypeEdgeMeta, timestamp ) );
+
+
+        /**
+         * Target write
+         *
+         */
+
+        final DirectedEdgeMeta targetEdgeMeta = DirectedEdgeMeta.fromTargetNode( targetNode, edgeType );
+
+        final Collection<Shard> targetWriteShards =
+                edgeShardStrategy.getWriteShards( scope, edgeTimestamp, targetEdgeMeta ).getWriteShards( now );
+
+        batch.mergeShallow( shardedEdgeSerialization
+                .writeEdgeToTarget( edgeColumnFamilies, scope, markedEdge, targetWriteShards, targetEdgeMeta,
+                        timestamp ) );
+
+
+        /**
+         * Target with source type write
+         */
+
+        final DirectedEdgeMeta targetSourceTypeEdgeMeta =
+                DirectedEdgeMeta.fromTargetNodeSourceType( targetNode, edgeType, sourceNode.getType() );
+
+        final Collection<Shard> targetSourceTypeWriteShards =
+                edgeShardStrategy.getWriteShards( scope, edgeTimestamp, targetSourceTypeEdgeMeta )
+                                 .getWriteShards( now );
+
+        batch.mergeShallow( shardedEdgeSerialization
+                .writeEdgeToTargetWithSourceType( edgeColumnFamilies, scope, markedEdge, targetSourceTypeWriteShards,
+                        targetSourceTypeEdgeMeta, timestamp ) );
+
+
+        /**
+         * Version write
+         */
+
+        final DirectedEdgeMeta edgeVersionsMeta = DirectedEdgeMeta.fromEdge( sourceNode, targetNode, edgeType );
+
+        final Collection<Shard> edgeVersionsShards =
+                edgeShardStrategy.getWriteShards( scope, edgeTimestamp, edgeVersionsMeta ).getWriteShards( now );
+
+        batch.mergeShallow( shardedEdgeSerialization
+                .writeEdgeVersions( edgeColumnFamilies, scope, markedEdge, edgeVersionsShards,
+                        edgeVersionsMeta, timestamp ) );
+
+
+        return batch;
     }
 
 
     @Override
     public MutationBatch deleteEdge( final ApplicationScope scope, final MarkedEdge markedEdge, final UUID timestamp ) {
-        return shardedEdgeSerialization.deleteEdge( edgeColumnFamilies, scope, markedEdge, timestamp );
+        ValidationUtils.validateApplicationScope( scope );
+        GraphValidation.validateEdge( markedEdge );
+        ValidationUtils.verifyTimeUuid( timestamp, "timestamp" );
+
+        final long now = timeService.getCurrentTime();
+        final Id sourceNode = markedEdge.getSourceNode();
+        final Id targetNode = markedEdge.getTargetNode();
+        final String edgeType = markedEdge.getType();
+        final long edgeTimestamp = markedEdge.getTimestamp();
+
+        /**
+         * Source write
+         */
+        final DirectedEdgeMeta sourceEdgeMeta = DirectedEdgeMeta.fromSourceNode( sourceNode, edgeType );
+
+        final Collection<Shard> sourceWriteShards =
+                edgeShardStrategy.getWriteShards( scope, edgeTimestamp, sourceEdgeMeta ).getWriteShards( now );
+
+        final MutationBatch batch = shardedEdgeSerialization
+                .deleteEdgeFromSource( edgeColumnFamilies, scope, markedEdge, sourceWriteShards, sourceEdgeMeta,
+                        timestamp );
+
+
+        /**
+         * Source with target  type write
+         */
+        final DirectedEdgeMeta sourceTargetTypeEdgeMeta =
+                DirectedEdgeMeta.fromSourceNodeTargetType( sourceNode, edgeType, targetNode.getType() );
+
+        final Collection<Shard> sourceTargetTypeWriteShards =
+                edgeShardStrategy.getWriteShards( scope, edgeTimestamp, sourceTargetTypeEdgeMeta )
+                                 .getWriteShards( now );
+
+        batch.mergeShallow( shardedEdgeSerialization
+                .deleteEdgeFromSourceWithTargetType( edgeColumnFamilies, scope, markedEdge, sourceTargetTypeWriteShards,
+                        sourceTargetTypeEdgeMeta, timestamp ) );
+
+
+        /**
+         * Target write
+         *
+         */
+
+        final DirectedEdgeMeta targetEdgeMeta = DirectedEdgeMeta.fromTargetNode( targetNode, edgeType );
+
+        final Collection<Shard> targetWriteShards =
+                edgeShardStrategy.getWriteShards( scope, edgeTimestamp, targetEdgeMeta ).getWriteShards( now );
+
+        batch.mergeShallow( shardedEdgeSerialization
+                .deleteEdgeToTarget( edgeColumnFamilies, scope, markedEdge, targetWriteShards, targetEdgeMeta,
+                        timestamp ) );
+
+
+        /**
+         * Target with source type write
+         */
+
+        final DirectedEdgeMeta targetSourceTypeEdgeMeta =
+                DirectedEdgeMeta.fromTargetNodeSourceType( targetNode, edgeType, sourceNode.getType() );
+
+        final Collection<Shard> targetSourceTypeWriteShards =
+                edgeShardStrategy.getWriteShards( scope, edgeTimestamp, targetSourceTypeEdgeMeta )
+                                 .getWriteShards( now );
+
+        batch.mergeShallow( shardedEdgeSerialization
+                .deleteEdgeToTargetWithSourceType( edgeColumnFamilies, scope, markedEdge, targetSourceTypeWriteShards,
+                        targetSourceTypeEdgeMeta, timestamp ) );
+
+
+        /**
+         * Version write
+         */
+
+        final DirectedEdgeMeta edgeVersionsMeta = DirectedEdgeMeta.fromEdge( sourceNode, targetNode, edgeType );
+
+        final Collection<Shard> edgeVersionsShards =
+                edgeShardStrategy.getWriteShards( scope, edgeTimestamp, edgeVersionsMeta ).getWriteShards( now );
+
+        batch.mergeShallow( shardedEdgeSerialization
+                .deleteEdgeVersions( edgeColumnFamilies, scope, markedEdge, edgeVersionsShards,
+                        edgeVersionsMeta, timestamp ) );
+
+
+        return batch;
     }
 
 
@@ -111,9 +288,17 @@ public class EdgeSerializationImpl implements EdgeSerialization  {
 
 
         final Iterator<ShardEntryGroup> readShards =
-                edgeShardStrategy.getReadShards(scope, maxTimestamp, versionMetaData );
+                edgeShardStrategy.getReadShards( scope, maxTimestamp, versionMetaData );
+
+
+        //now create a result iterator with our iterator of read shards
 
-        return shardedEdgeSerialization.getEdgeVersions( edgeColumnFamilies, scope, search, readShards );
+        return new ShardGroupColumnIterator<MarkedEdge>( readShards ) {
+            @Override
+            protected Iterator<MarkedEdge> getIterator( final Collection<Shard> readShards ) {
+                return shardedEdgeSerialization.getEdgeVersions( edgeColumnFamilies, scope, search, readShards );
+            }
+        };
     }
 
 
@@ -129,13 +314,17 @@ public class EdgeSerializationImpl implements EdgeSerialization  {
         final long maxTimestamp = edgeType.getMaxTimestamp();
 
 
-        final DirectedEdgeMeta sourceMeta = DirectedEdgeMeta.fromSourceNode( sourceId, type );
+        final DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromSourceNode( sourceId, type );
 
 
-        final Iterator<ShardEntryGroup> readShards =
-                edgeShardStrategy.getReadShards(scope, maxTimestamp, sourceMeta );
+        final Iterator<ShardEntryGroup> readShards = edgeShardStrategy.getReadShards( scope, maxTimestamp, directedEdgeMeta );
 
-        return shardedEdgeSerialization.getEdgesFromSource( edgeColumnFamilies, scope, edgeType, readShards );
+        return new ShardGroupColumnIterator<MarkedEdge>( readShards ) {
+            @Override
+            protected Iterator<MarkedEdge> getIterator( final Collection<Shard> readShards ) {
+                return shardedEdgeSerialization.getEdgesFromSource( edgeColumnFamilies, scope, edgeType, readShards );
+            }
+        };
     }
 
 
@@ -151,14 +340,19 @@ public class EdgeSerializationImpl implements EdgeSerialization  {
         final String targetType = edgeType.getIdType();
         final long maxTimestamp = edgeType.getMaxTimestamp();
 
-        final DirectedEdgeMeta sourceMeta = DirectedEdgeMeta.fromSourceNodeTargetType( sourceId, type, targetType );
+        final DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromSourceNodeTargetType( sourceId, type, targetType );
 
 
-        final Iterator<ShardEntryGroup> readShards =
-                edgeShardStrategy.getReadShards(scope, maxTimestamp, sourceMeta );
+        final Iterator<ShardEntryGroup> readShards = edgeShardStrategy.getReadShards( scope, maxTimestamp, directedEdgeMeta );
 
 
-        return shardedEdgeSerialization.getEdgesFromSourceByTargetType( edgeColumnFamilies, scope, edgeType, readShards );
+        return new ShardGroupColumnIterator<MarkedEdge>( readShards ) {
+            @Override
+            protected Iterator<MarkedEdge> getIterator( final Collection<Shard> readShards ) {
+                return shardedEdgeSerialization
+                        .getEdgesFromSourceByTargetType( edgeColumnFamilies, scope, edgeType, readShards );
+            }
+        };
     }
 
 
@@ -173,15 +367,17 @@ public class EdgeSerializationImpl implements EdgeSerialization  {
         final long maxTimestamp = edgeType.getMaxTimestamp();
 
 
+        final DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromTargetNode( targetId, type );
 
-        final DirectedEdgeMeta sourceMeta = DirectedEdgeMeta.fromTargetNode( targetId, type );
 
+        final Iterator<ShardEntryGroup> readShards = edgeShardStrategy.getReadShards( scope, maxTimestamp, directedEdgeMeta );
 
-        final Iterator<ShardEntryGroup> readShards =
-                edgeShardStrategy.getReadShards(scope, maxTimestamp, sourceMeta );
-
-
-        return shardedEdgeSerialization.getEdgesToTarget( edgeColumnFamilies, scope, edgeType, readShards );
+        return new ShardGroupColumnIterator<MarkedEdge>( readShards ) {
+            @Override
+            protected Iterator<MarkedEdge> getIterator( final Collection<Shard> readShards ) {
+                return shardedEdgeSerialization.getEdgesToTarget( edgeColumnFamilies, scope, edgeType, readShards );
+            }
+        };
     }
 
 
@@ -198,14 +394,18 @@ public class EdgeSerializationImpl implements EdgeSerialization  {
         final long maxTimestamp = edgeType.getMaxTimestamp();
 
 
-        final DirectedEdgeMeta sourceMeta = DirectedEdgeMeta.fromSourceNodeTargetType( targetId, type, sourceType );
+        final DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromTargetNodeSourceType( targetId, type, sourceType );
 
 
-        final Iterator<ShardEntryGroup> readShards =
-                edgeShardStrategy.getReadShards(scope, maxTimestamp, sourceMeta );
+        final Iterator<ShardEntryGroup> readShards = edgeShardStrategy.getReadShards( scope, maxTimestamp, directedEdgeMeta );
 
 
-        return shardedEdgeSerialization.getEdgesToTargetBySourceType( edgeColumnFamilies, scope, edgeType, readShards );
+        return new ShardGroupColumnIterator<MarkedEdge>( readShards ) {
+            @Override
+            protected Iterator<MarkedEdge> getIterator( final Collection<Shard> readShards ) {
+                return shardedEdgeSerialization
+                        .getEdgesToTargetBySourceType( edgeColumnFamilies, scope, edgeType, readShards );
+            }
+        };
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c69c1974/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/DirectedEdgeMeta.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/DirectedEdgeMeta.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/DirectedEdgeMeta.java
index 89e46d9..92f2548 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/DirectedEdgeMeta.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/DirectedEdgeMeta.java
@@ -23,6 +23,7 @@ package org.apache.usergrid.persistence.graph.serialization.impl.shard;
 
 
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -87,6 +88,36 @@ public abstract class DirectedEdgeMeta {
         public NodeType getNodeType() {
             return nodeType;
         }
+
+
+        @Override
+        public boolean equals( final Object o ) {
+            if ( this == o ) {
+                return true;
+            }
+            if ( !( o instanceof NodeMeta ) ) {
+                return false;
+            }
+
+            final NodeMeta nodeMeta = ( NodeMeta ) o;
+
+            if ( !id.equals( nodeMeta.id ) ) {
+                return false;
+            }
+            if ( nodeType != nodeMeta.nodeType ) {
+                return false;
+            }
+
+            return true;
+        }
+
+
+        @Override
+        public int hashCode() {
+            int result = id.hashCode();
+            result = 31 * result + nodeType.hashCode();
+            return result;
+        }
     }
 
 
@@ -125,9 +156,11 @@ public abstract class DirectedEdgeMeta {
      */
     public abstract Iterator<MarkedEdge> loadEdges( final ShardedEdgeSerialization serialization,
                                                     final EdgeColumnFamilies edgeColumnFamilies,
-                                                    final ApplicationScope scope, final ShardEntryGroup group,
+                                                    final ApplicationScope scope, final Collection<Shard> shards,
                                                     final long maxValue );
 
+
+
     /**
      * Get the type of this directed edge
      */
@@ -192,7 +225,7 @@ public abstract class DirectedEdgeMeta {
             @Override
             public Iterator<MarkedEdge> loadEdges( final ShardedEdgeSerialization serialization,
                                                    final EdgeColumnFamilies edgeColumnFamilies,
-                                                   final ApplicationScope scope, final ShardEntryGroup group,
+                                                   final ApplicationScope scope, final Collection<Shard>  shards,
                                                    final long maxValue ) {
 
                 final Id sourceId = nodes[0].id;
@@ -200,8 +233,7 @@ public abstract class DirectedEdgeMeta {
 
                 final SearchByEdgeType search = new SimpleSearchByEdgeType( sourceId, edgeType, maxValue, null );
 
-                return serialization.getEdgesFromSource( edgeColumnFamilies, scope, search,
-                        Collections.singleton( group ).iterator() );
+                return serialization.getEdgesFromSource( edgeColumnFamilies, scope, search, shards );
             }
 
 
@@ -233,9 +265,9 @@ public abstract class DirectedEdgeMeta {
             @Override
             public Iterator<MarkedEdge> loadEdges( final ShardedEdgeSerialization serialization,
                                                    final EdgeColumnFamilies edgeColumnFamilies,
-                                                   final ApplicationScope scope, final ShardEntryGroup group,
+                                                   final ApplicationScope scope, final Collection<Shard>shards,
                                                    final long maxValue ) {
-
+//
                 final Id sourceId = nodes[0].id;
                 final String edgeType = types[0];
                 final String targetType = types[1];
@@ -243,8 +275,8 @@ public abstract class DirectedEdgeMeta {
                 final SearchByIdType search =
                         new SimpleSearchByIdType( sourceId, edgeType, maxValue, targetType, null );
 
-                return serialization.getEdgesFromSourceByTargetType( edgeColumnFamilies, scope, search,
-                        Collections.singleton( group ).iterator() );
+                return serialization.getEdgesFromSourceByTargetType( edgeColumnFamilies, scope, search, shards);
+
             }
 
 
@@ -272,7 +304,7 @@ public abstract class DirectedEdgeMeta {
             @Override
             public Iterator<MarkedEdge> loadEdges( final ShardedEdgeSerialization serialization,
                                                    final EdgeColumnFamilies edgeColumnFamilies,
-                                                   final ApplicationScope scope, final ShardEntryGroup group,
+                                                   final ApplicationScope scope, final Collection<Shard>  shards,
                                                    final long maxValue ) {
 
 
@@ -281,8 +313,7 @@ public abstract class DirectedEdgeMeta {
 
                 final SearchByEdgeType search = new SimpleSearchByEdgeType( targetId, edgeType, maxValue, null );
 
-                return serialization.getEdgesToTarget( edgeColumnFamilies, scope, search,
-                        Collections.singleton( group ).iterator() );
+                return serialization.getEdgesToTarget( edgeColumnFamilies, scope, search, shards );
             }
 
 
@@ -311,7 +342,7 @@ public abstract class DirectedEdgeMeta {
             @Override
             public Iterator<MarkedEdge> loadEdges( final ShardedEdgeSerialization serialization,
                                                    final EdgeColumnFamilies edgeColumnFamilies,
-                                                   final ApplicationScope scope, final ShardEntryGroup group,
+                                                   final ApplicationScope scope, final Collection<Shard> shards,
                                                    final long maxValue ) {
 
                 final Id targetId = nodes[0].id;
@@ -322,8 +353,7 @@ public abstract class DirectedEdgeMeta {
                 final SearchByIdType search =
                         new SimpleSearchByIdType( targetId, edgeType, maxValue, sourceType, null );
 
-                return serialization.getEdgesToTargetBySourceType( edgeColumnFamilies, scope, search,
-                        Collections.singleton( group ).iterator() );
+                return serialization.getEdgesToTargetBySourceType( edgeColumnFamilies, scope, search, shards);
             }
 
 
@@ -355,7 +385,7 @@ public abstract class DirectedEdgeMeta {
             @Override
             public Iterator<MarkedEdge> loadEdges( final ShardedEdgeSerialization serialization,
                                                    final EdgeColumnFamilies edgeColumnFamilies,
-                                                   final ApplicationScope scope, final ShardEntryGroup group,
+                                                   final ApplicationScope scope, final Collection<Shard>  shards,
                                                    final long maxValue ) {
 
                 final Id sourceId = nodes[0].id;
@@ -365,8 +395,8 @@ public abstract class DirectedEdgeMeta {
                 final SimpleSearchByEdge search =
                         new SimpleSearchByEdge( sourceId, edgeType, targetId, maxValue, null );
 
-                return serialization.getEdgeVersions( edgeColumnFamilies, scope, search,
-                        Collections.singleton( group ).iterator() );
+                return serialization.getEdgeVersions( edgeColumnFamilies, scope, search, shards);
+
             }
 
 
@@ -378,6 +408,7 @@ public abstract class DirectedEdgeMeta {
     }
 
 
+
     /**
      * Create a directed edge from the stored meta data
      * @param metaType The meta type stored

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c69c1974/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
index 59ea9fb..70569fd 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
@@ -256,13 +256,13 @@ public class ShardEntryGroup {
     }
 
 
-    /**
-     * Helper method to create a shard entry group with a single shard
-     */
-    public static ShardEntryGroup singletonGroup( final Shard shard, final long delta ) {
-        ShardEntryGroup group = new ShardEntryGroup( delta );
-        group.addShard( shard );
-
-        return group;
+    @Override
+    public String toString() {
+        return "ShardEntryGroup{" +
+                "shards=" + shards +
+                ", delta=" + delta +
+                ", maxCreatedTime=" + maxCreatedTime +
+                ", compactionTarget=" + compactionTarget +
+                '}';
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c69c1974/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
index 13c5596..bf4d3c9 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
@@ -22,6 +22,10 @@
 package org.apache.usergrid.persistence.graph.serialization.impl.shard;
 
 
+import java.util.Set;
+
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+
 import rx.Observable;
 
 
@@ -36,8 +40,39 @@ public interface ShardGroupCompaction {
     /**
      * Execute the compaction task.  Will return the number of edges that have
      * @param group The shard entry group to compact
-     * @return The number of edges that are now compacted into the target shard
+     * @return The shards that were compacted
+     */
+    public Set<Shard> compact(final ApplicationScope scope, final DirectedEdgeMeta edgeMeta, final ShardEntryGroup group);
+
+    /**
+     * Possibly audit the shard entry group.  This is asynchronous and returns immediately
+     * @param group
+     * @return
      */
-    public Observable<Integer> compact(ShardEntryGroup group);
+    public AuditResult evaluateShardGroup( final ApplicationScope scope, final DirectedEdgeMeta edgeMeta,
+                                           final ShardEntryGroup group );
+
+
+    public enum AuditResult{
+        /**
+         * We didn't check this shard
+         */
+        NOT_CHECKED,
+        /**
+         * This shard was checked, but nothing was allocated
+         */
+        CHECKED_NO_OP,
+
+        /**
+         * We checked and created a new shard
+         */
+        CHECKED_CREATED,
+
+        /**
+         * The shard group is already compacting
+         */
+        COMPACTING
+    }
+
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c69c1974/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardedEdgeSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardedEdgeSerialization.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardedEdgeSerialization.java
index d1ad18d..51fdf0c 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardedEdgeSerialization.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardedEdgeSerialization.java
@@ -19,7 +19,9 @@
 package org.apache.usergrid.persistence.graph.serialization.impl.shard;
 
 
+import java.util.Collection;
 import java.util.Iterator;
+import java.util.Set;
 import java.util.UUID;
 
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
@@ -44,19 +46,84 @@ public interface ShardedEdgeSerialization {
      * @param markedEdge The edge to write
      * @param timestamp The timestamp to use
      */
-    MutationBatch writeEdge( EdgeColumnFamilies columnFamilies, ApplicationScope scope, MarkedEdge markedEdge,
-                             UUID timestamp );
+    MutationBatch writeEdgeFromSource( EdgeColumnFamilies columnFamilies, ApplicationScope scope, MarkedEdge markedEdge,
+                                       Collection<Shard> shards,  DirectedEdgeMeta sourceEdgeMeta,  UUID timestamp );
+
+
+    /**
+     * Write the edge from source->target
+     */
+    MutationBatch writeEdgeFromSourceWithTargetType( EdgeColumnFamilies columnFamilies, ApplicationScope scope,
+                                                     MarkedEdge markedEdge, Collection<Shard> shards,  DirectedEdgeMeta sourceEdgeMeta, UUID timestamp );
+
+    /**
+     * Write the edge from target to source
+     */
+    MutationBatch writeEdgeToTarget( EdgeColumnFamilies columnFamilies, ApplicationScope scope, MarkedEdge markedEdge,
+                                     Collection<Shard> shards,  DirectedEdgeMeta sourceEdgeMeta, UUID timestamp );
+
+
+    /**
+     * Write the edge from target to source with source type
+     */
+    MutationBatch writeEdgeToTargetWithSourceType( EdgeColumnFamilies columnFamilies, ApplicationScope scope,
+                                                   MarkedEdge markedEdge, Collection<Shard> shards, DirectedEdgeMeta sourceEdgeMeta,  UUID timestamp );
+
+
+    /**
+        * EdgeWrite both the source--->Target edge and the target <----- source edge into the mutation
+        *
+        * @param columnFamilies The column families to use
+        * @param scope The org scope of the graph
+        * @param markedEdge The edge to write
+        * @param timestamp The timestamp to use
+        */
+       MutationBatch writeEdgeVersions( EdgeColumnFamilies columnFamilies, ApplicationScope scope, MarkedEdge markedEdge,
+                                          Collection<Shard> shards,  DirectedEdgeMeta sourceEdgeMeta,  UUID timestamp );
+
+
 
     /**
-     * EdgeWrite both the source -->target edge and the target<--- source edge into the mutation
+     * EdgeWrite both the source--->Target edge and the target <----- source edge into the mutation
      *
      * @param columnFamilies The column families to use
      * @param scope The org scope of the graph
      * @param markedEdge The edge to write
-     * @param timestamp The timestamp of the uuid
+     * @param timestamp The timestamp to use
+     */
+    MutationBatch deleteEdgeFromSource( EdgeColumnFamilies columnFamilies, ApplicationScope scope,
+                                        MarkedEdge markedEdge, Collection<Shard> shards,  DirectedEdgeMeta sourceEdgeMeta, UUID timestamp );
+
+
+    /**
+     * Write the edge from source->target
+     */
+    MutationBatch deleteEdgeFromSourceWithTargetType( EdgeColumnFamilies columnFamilies, ApplicationScope scope,
+                                                      MarkedEdge markedEdge, Collection<Shard> shards,  DirectedEdgeMeta sourceEdgeMeta,  UUID timestamp );
+
+    /**
+     * Write the edge from target to source
+     */
+    MutationBatch deleteEdgeToTarget( EdgeColumnFamilies columnFamilies, ApplicationScope scope, MarkedEdge markedEdge,
+                                      Collection<Shard> shards, DirectedEdgeMeta sourceEdgeMeta,  UUID timestamp );
+
+
+    /**
+     * Write the edge from target to source with source type
      */
-    MutationBatch deleteEdge( EdgeColumnFamilies columnFamilies, ApplicationScope scope, MarkedEdge markedEdge,
-                              UUID timestamp );
+    MutationBatch deleteEdgeToTargetWithSourceType( EdgeColumnFamilies columnFamilies, ApplicationScope scope,
+                                                    MarkedEdge markedEdge, Collection<Shard> shards,  DirectedEdgeMeta sourceEdgeMeta, UUID timestamp );
+
+    /**
+            * EdgeWrite both the source--->Target edge and the target <----- source edge into the mutation
+            *
+            * @param columnFamilies The column families to use
+            * @param scope The org scope of the graph
+            * @param markedEdge The edge to write
+            * @param timestamp The timestamp to use
+            */
+           MutationBatch deleteEdgeVersions( EdgeColumnFamilies columnFamilies, ApplicationScope scope, MarkedEdge markedEdge,
+                                              Collection<Shard> shards,  DirectedEdgeMeta sourceEdgeMeta,  UUID timestamp );
 
 
     /**
@@ -65,10 +132,10 @@ public interface ShardedEdgeSerialization {
      * @param columnFamilies The column families to use
      * @param scope The application scope
      * @param search The search criteria
-     * @param shards The shards to iterate when searching
+     * @param shards The shards multiget when reading
      */
     Iterator<MarkedEdge> getEdgeVersions( EdgeColumnFamilies columnFamilies, ApplicationScope scope,
-                                          SearchByEdge search, Iterator<ShardEntryGroup> shards );
+                                          SearchByEdge search, Collection<Shard> shards );
 
     /**
      * Get an iterator of all edges by edge type originating from source node
@@ -79,7 +146,7 @@ public interface ShardedEdgeSerialization {
      * @param shards The shards to iterate when searching
      */
     Iterator<MarkedEdge> getEdgesFromSource( EdgeColumnFamilies columnFamilies, ApplicationScope scope,
-                                             SearchByEdgeType search, Iterator<ShardEntryGroup> shards );
+                                             SearchByEdgeType search, Collection<Shard> shards );
 
 
     /**
@@ -91,7 +158,7 @@ public interface ShardedEdgeSerialization {
      * @param shards The shards to iterate when searching
      */
     Iterator<MarkedEdge> getEdgesFromSourceByTargetType( EdgeColumnFamilies columnFamilies, ApplicationScope scope,
-                                                         SearchByIdType search, Iterator<ShardEntryGroup> shards );
+                                                         SearchByIdType search, Collection<Shard> shards );
 
     /**
      * Get an iterator of all edges by edge type pointing to the target node.  Returns all versions
@@ -102,7 +169,7 @@ public interface ShardedEdgeSerialization {
      * @param shards The shards to iterate when searching
      */
     Iterator<MarkedEdge> getEdgesToTarget( EdgeColumnFamilies columnFamilies, ApplicationScope scope,
-                                           SearchByEdgeType search, Iterator<ShardEntryGroup> shards );
+                                           SearchByEdgeType search, Collection<Shard> shards );
 
 
     /**
@@ -115,5 +182,5 @@ public interface ShardedEdgeSerialization {
      * @param shards The shards to iterate when searching
      */
     Iterator<MarkedEdge> getEdgesToTargetBySourceType( EdgeColumnFamilies columnFamilies, ApplicationScope scope,
-                                                       SearchByIdType search, Iterator<ShardEntryGroup> shards );
+                                                       SearchByIdType search, Collection<Shard> shards );
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c69c1974/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java
index d3d92ea..ed3daaf 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java
@@ -127,7 +127,7 @@ public class NodeShardCounterSerializationImpl implements NodeShardCounterSerial
         }
         //column not found, return 0
         catch ( RuntimeException re ) {
-            if(re.getCause() instanceof NotFoundException) {
+            if(re.getCause().getCause() instanceof NotFoundException) {
                 return 0;
             }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c69c1974/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
index b7cc25d..27862d0 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
@@ -6,6 +6,7 @@ import java.util.Collection;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.usergrid.persistence.core.astyanax.ColumnParser;
 import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
@@ -29,19 +30,18 @@ import com.netflix.astyanax.util.RangeBuilder;
  * @param <C> The column type
  * @param <T> The parsed return type
  */
-public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Comparator<T>,
-        Iterator<List<ScopedRowKey<ApplicationScope, R>>> {
+public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Comparator<T> {
 
     protected final Optional<Edge> last;
     protected final long maxTimestamp;
     protected final ApplicationScope scope;
-    protected final Iterator<ShardEntryGroup> shards;
+    protected final Collection<Shard> shards;
 
 
     protected EdgeSearcher( final ApplicationScope scope, final long maxTimestamp, final Optional<Edge> last,
-                            final Iterator<ShardEntryGroup> shards ) {
+                            final Collection<Shard> shards ) {
 
-        Preconditions.checkArgument(shards.hasNext(), "Cannot search with no possible shards");
+        Preconditions.checkArgument(shards.size() > 0 , "Cannot search with no possible shards");
 
         this.scope = scope;
         this.maxTimestamp = maxTimestamp;
@@ -50,19 +50,12 @@ public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Compa
     }
 
 
-    @Override
-    public boolean hasNext() {
-        return shards.hasNext();
-    }
 
+    public List<ScopedRowKey<ApplicationScope, R>> getRowKeys() {
 
-    @Override
-    public List<ScopedRowKey<ApplicationScope, R>> next() {
-        Collection<Shard> readShards = shards.next().getReadShards();
+        List<ScopedRowKey<ApplicationScope, R>> rowKeys = new ArrayList<>(shards.size());
 
-        List<ScopedRowKey<ApplicationScope, R>> rowKeys = new ArrayList<>(readShards.size());
-
-        for(Shard shard : readShards){
+        for(Shard shard : shards){
 
             final ScopedRowKey<ApplicationScope, R> rowKey = ScopedRowKey
                     .fromKey( scope, generateRowKey(shard.getShardIndex() ) );
@@ -75,11 +68,6 @@ public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Compa
     }
 
 
-    @Override
-    public void remove() {
-        throw new UnsupportedOperationException( "Remove is unsupported" );
-    }
-
 
     /**
      * Set the range on a search
@@ -93,10 +81,7 @@ public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Compa
 
             builder.setStart( sourceEdge, getSerializer() );
         }
-        else {
-
 
-        }
     }
 
 
@@ -113,6 +98,7 @@ public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Compa
     }
 
 
+
     /**
      * Get the column's serializer
      */

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c69c1974/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
index 6c79814..b919ad7 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
@@ -42,7 +42,9 @@ import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardedEdgeSerialization;
 import org.apache.usergrid.persistence.graph.serialization.util.GraphValidation;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 
+import com.fasterxml.uuid.impl.UUIDUtil;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
@@ -167,7 +169,7 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
          */
 
         final Iterator<MarkedEdge> edges = directedEdgeMeta
-                .loadEdges( shardedEdgeSerialization, edgeColumnFamilies, scope, shardEntryGroup, Long.MAX_VALUE );
+                .loadEdges( shardedEdgeSerialization, edgeColumnFamilies, scope, shardEntryGroup.getReadShards(), Long.MAX_VALUE );
 
 
         if ( !edges.hasNext() ) {
@@ -218,15 +220,18 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
      */
     private boolean isNewNode( DirectedEdgeMeta directedEdgeMeta ) {
 
-        /**
-         * the max time in microseconds we can allow
-         */
-        final long maxTime = ( timeService.getCurrentTime() + graphFig.getShardCacheTimeout() ) * 10000;
+        //The timeout is in milliseconds.  Time for a time uuid is 1/10000 of a milli, so we need to get the units correct
+        final long uuidDelta =  graphFig.getShardCacheTimeout()  * 10000;
+
+        final long timeNow = UUIDGenerator.newTimeUUID().timestamp();
 
         for ( DirectedEdgeMeta.NodeMeta node : directedEdgeMeta.getNodes() ) {
+
             final long uuidTime = node.getId().getUuid().timestamp();
 
-            if ( uuidTime < maxTime ) {
+            final long uuidTimeDelta = uuidTime + uuidDelta;
+
+            if ( uuidTimeDelta < timeNow ) {
                 return true;
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c69c1974/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
index d3dd86e..d444eec 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
@@ -22,10 +22,16 @@ package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
 import java.beans.PropertyChangeEvent;
 import java.beans.PropertyChangeListener;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.TreeMap;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 import org.slf4j.Logger;
@@ -49,11 +55,19 @@ import com.google.common.base.Preconditions;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
 import com.google.common.cache.Weigher;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListenableFutureTask;
+import com.google.common.util.concurrent.ListeningScheduledExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.google.inject.Inject;
 
+import rx.Observable;
+import rx.Subscriber;
+import rx.functions.Action1;
+import rx.functions.Func1;
+import rx.schedulers.Schedulers;
+
 
 /**
  * Simple implementation of the shard.  Uses a local Guava shard with a timeout.  If a value is not present in the
@@ -69,10 +83,14 @@ public class NodeShardCacheImpl implements NodeShardCache {
      */
     private static final int MAX_WEIGHT_PER_ELEMENT = 10000;
 
+
     private final NodeShardAllocation nodeShardAllocation;
     private final GraphFig graphFig;
     private final TimeService timeservice;
 
+
+
+    private ListeningScheduledExecutorService refreshExecutors;
     private LoadingCache<CacheKey, CacheEntry> graphs;
 
 
@@ -93,6 +111,7 @@ public class NodeShardCacheImpl implements NodeShardCache {
         this.graphFig = graphFig;
         this.timeservice = timeservice;
 
+
         /**
          * Add our listener to reconstruct the shard
          */
@@ -102,7 +121,9 @@ public class NodeShardCacheImpl implements NodeShardCache {
                 final String propertyName = evt.getPropertyName();
 
                 if ( propertyName.equals( GraphFig.SHARD_CACHE_SIZE ) || propertyName
-                        .equals( GraphFig.SHARD_CACHE_TIMEOUT ) ) {
+                        .equals( GraphFig.SHARD_CACHE_TIMEOUT ) || propertyName
+                        .equals( GraphFig.SHARD_CACHE_REFRESH_WORKERS ) ) {
+
 
                     updateCache();
                 }
@@ -120,7 +141,7 @@ public class NodeShardCacheImpl implements NodeShardCache {
     public ShardEntryGroup getWriteShardGroup( final ApplicationScope scope, final long timestamp,
                                                final DirectedEdgeMeta directedEdgeMeta ) {
 
-        ValidationUtils.validateApplicationScope(scope);
+        ValidationUtils.validateApplicationScope( scope );
         GraphValidation.validateDirectedEdgeMeta( directedEdgeMeta );
 
         final CacheKey key = new CacheKey( scope, directedEdgeMeta );
@@ -147,7 +168,7 @@ public class NodeShardCacheImpl implements NodeShardCache {
     @Override
     public Iterator<ShardEntryGroup> getReadShardGroup( final ApplicationScope scope, final long maxTimestamp,
                                                         final DirectedEdgeMeta directedEdgeMeta ) {
-        ValidationUtils.validateApplicationScope(scope);
+        ValidationUtils.validateApplicationScope( scope );
         GraphValidation.validateDirectedEdgeMeta( directedEdgeMeta );
 
         final CacheKey key = new CacheKey( scope, directedEdgeMeta );
@@ -175,11 +196,25 @@ public class NodeShardCacheImpl implements NodeShardCache {
      * doesn't have to be precise.  The algorithm accounts for stale data.
      */
     private void updateCache() {
+        if ( this.refreshExecutors != null ) {
+            this.refreshExecutors.shutdown();
+        }
+
+        this.refreshExecutors = MoreExecutors
+                .listeningDecorator( Executors.newScheduledThreadPool( graphFig.getShardCacheRefreshWorkerCount() ) );
+
+
+        this.graphs = CacheBuilder.newBuilder()
+
+                //we want to asynchronously load new values for existing ones, that way we wont' have to
+                //wait for a trip to cassandra
+                .refreshAfterWrite( graphFig.getShardCacheTimeout(), TimeUnit.MILLISECONDS )
+
+                        //set our weight function, since not all shards are equal
+                .maximumWeight(MAX_WEIGHT_PER_ELEMENT * graphFig.getShardCacheSize() ).weigher( new ShardWeigher() )
 
-        this.graphs = CacheBuilder.newBuilder().expireAfterWrite( graphFig.getShardCacheSize(), TimeUnit.MILLISECONDS )
-                                  .removalListener( new ShardRemovalListener() )
-                                  .maximumWeight( MAX_WEIGHT_PER_ELEMENT * graphFig.getShardCacheSize() )
-                                  .weigher( new ShardWeigher() ).build( new ShardCacheLoader() );
+                        //set our shard loader
+                .build( new ShardCacheLoader() );
     }
 
 
@@ -296,77 +331,43 @@ public class NodeShardCacheImpl implements NodeShardCache {
 
 
         @Override
-        public CacheEntry load( final CacheKey key ) throws Exception {
+        public CacheEntry load( final CacheKey key ) {
 
 
             final Iterator<ShardEntryGroup> edges =
                     nodeShardAllocation.getShards( key.scope, Optional.<Shard>absent(), key.directedEdgeMeta );
 
-            return new CacheEntry( edges );
+            final CacheEntry cacheEntry = new CacheEntry( edges );
+
+            return cacheEntry;
         }
-    }
 
 
-    /**
-     * Calculates the weight of the entry by geting the size of the cache
-     */
-    final class ShardWeigher implements Weigher<CacheKey, CacheEntry> {
-
         @Override
-        public int weigh( final CacheKey key, final CacheEntry value ) {
-            return value.getCacheSize();
+        public ListenableFuture<CacheEntry> reload( final CacheKey key, final CacheEntry oldValue ) throws Exception {
+            ListenableFutureTask<CacheEntry> task = ListenableFutureTask.create( new Callable<CacheEntry>() {
+                public CacheEntry call() {
+                    return load( key );
+                }
+            } );
+            //load via the refresh executor
+            refreshExecutors.execute( task );
+            return task;
         }
+
+        //TODO, use RX for sliding window buffering and duplicate removal
     }
 
 
+
     /**
-     * On removal from the cache, we want to audit the maximum shard.  If it needs to allocate a new shard, we want to
-     * do so. IF there's a compaction pending, we want to run the compaction task
+     * Calculates the weight of the entry by geting the size of the cache
      */
-    final class ShardRemovalListener implements RemovalListener<CacheKey, CacheEntry> {
+    final class ShardWeigher implements Weigher<CacheKey, CacheEntry> {
 
         @Override
-        public void onRemoval( final RemovalNotification<CacheKey, CacheEntry> notification ) {
-
-
-            final CacheKey key = notification.getKey();
-            final CacheEntry entry = notification.getValue();
-
-
-            Iterator<ShardEntryGroup> groups = entry.getShards( Long.MAX_VALUE );
-
-
-            /**
-             * Start at our max, then
-             */
-
-            //audit all our groups
-            while ( groups.hasNext() ) {
-                ShardEntryGroup group = groups.next();
-
-                /**
-                 * We don't have a compaction pending.  Run an audit on the shards
-                 */
-                if ( !group.isCompactionPending() ) {
-                    LOG.debug( "No compaction pending, checking max shard on expiration" );
-                    /**
-                     * Check if we should allocate, we may want to
-                     */
-
-
-                    nodeShardAllocation.auditShard( key.scope, group, key.directedEdgeMeta );
-                    continue;
-                }
-                /**
-                 * Do the compaction
-                 */
-                if ( group.shouldCompact( timeservice.getCurrentTime() ) ) {
-                    //launch the compaction
-                }
-
-                //no op, there's nothing we need to do to this shard
-
-            }
+        public int weigh( final CacheKey key, final CacheEntry value ) {
+            return value.getCacheSize();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c69c1974/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupColumnIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupColumnIterator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupColumnIterator.java
new file mode 100644
index 0000000..8779b96
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupColumnIterator.java
@@ -0,0 +1,130 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
+
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.usergrid.persistence.core.astyanax.ColumnNameIterator;
+import org.apache.usergrid.persistence.core.astyanax.MultiKeyColumnNameIterator;
+import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
+import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
+
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.model.ConsistencyLevel;
+import com.netflix.astyanax.query.RowQuery;
+import com.netflix.astyanax.util.RangeBuilder;
+
+
+/**
+ *
+ * Iterator to keep iterating over multiple shard groups to stream results
+ *
+ * @param <T> The parsed return type
+ */
+public abstract class ShardGroupColumnIterator<T> implements Iterator<T> {
+
+
+    private final Iterator<ShardEntryGroup> entryGroupIterator;
+    private Iterator<T> elements;
+
+
+    public ShardGroupColumnIterator( final Iterator<ShardEntryGroup> entryGroupIterator ){
+        this.entryGroupIterator = entryGroupIterator;
+    }
+
+
+    @Override
+    public boolean hasNext() {
+
+
+        if(elements == null){
+            return advance();
+        }
+
+        if(elements.hasNext()){
+            return true;
+        }
+
+        //we've exhausted our shard groups and we don't have a next, we can't continue
+        if(!entryGroupIterator.hasNext()){
+            return false;
+        }
+
+
+        return advance();
+    }
+
+
+    @Override
+    public T next() {
+        if ( !hasNext() ) {
+            throw new NoSuchElementException( "There are no more rows or columns left to advance" );
+        }
+
+        return elements.next();
+    }
+
+
+    @Override
+    public void remove() {
+        throw new UnsupportedOperationException( "Remove is unsupported" );
+    }
+
+
+    /**
+     * Get an iterator for the shard entry group
+     * @param readShards the read shards to use
+     * @return
+     */
+    protected abstract Iterator<T> getIterator(Collection<Shard> readShards);
+
+
+    public boolean advance(){
+
+        while(entryGroupIterator.hasNext()){
+
+            final ShardEntryGroup group = entryGroupIterator.next();
+
+            elements = getIterator( group.getReadShards() );
+
+            /**
+             * We're done, we have some columns to return
+             */
+            if(elements.hasNext()){
+                return true;
+            }
+
+        }
+
+
+        return false;
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c69c1974/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
index 48336cd..c566d43 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
@@ -22,18 +22,45 @@
 package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
 
 
+import java.nio.charset.Charset;
+import java.util.BitSet;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
 
 import org.apache.usergrid.persistence.core.consistency.TimeService;
+import org.apache.usergrid.persistence.core.rx.ObservableIterator;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.GraphFig;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeColumnFamilies;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardSerialization;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardAllocation;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardGroupCompaction;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardedEdgeSerialization;
+import org.apache.usergrid.persistence.model.entity.Id;
 
 import com.google.common.base.Preconditions;
+import com.google.common.hash.HashCode;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 
 import rx.Observable;
+import rx.functions.Action0;
+import rx.functions.Action1;
+import rx.functions.Func1;
+import rx.schedulers.Schedulers;
 
 
 /**
@@ -43,20 +70,56 @@ import rx.Observable;
 public class ShardGroupCompactionImpl implements ShardGroupCompaction {
 
 
+    private static final Charset CHARSET = Charset.forName( "UTF-8" );
+
+    private static final HashFunction MURMUR_128 = Hashing.murmur3_128();
+
     private final TimeService timeService;
+    private final GraphFig graphFig;
+    private final NodeShardAllocation nodeShardAllocation;
+    private final ShardedEdgeSerialization shardedEdgeSerialization;
+    private final EdgeColumnFamilies edgeColumnFamilies;
+    private final EdgeShardSerialization edgeShardSerialization;
+
+
+    private final Random random;
+    private final ShardCompactionTaskTracker shardCompactionTaskTracker;
 
 
     @Inject
-    public ShardGroupCompactionImpl( final TimeService timeService ) {this.timeService = timeService;}
+    public ShardGroupCompactionImpl( final TimeService timeService, final GraphFig graphFig,
+                                     final NodeShardAllocation nodeShardAllocation,
+                                     final ShardedEdgeSerialization shardedEdgeSerialization,
+                                     final EdgeColumnFamilies edgeColumnFamilies,
+                                     final EdgeShardSerialization edgeShardSerialization ) {
+
+        this.timeService = timeService;
+        this.graphFig = graphFig;
+        this.nodeShardAllocation = nodeShardAllocation;
+        this.shardedEdgeSerialization = shardedEdgeSerialization;
+        this.edgeColumnFamilies = edgeColumnFamilies;
+        this.edgeShardSerialization = edgeShardSerialization;
+
+        this.random = new Random();
+        this.shardCompactionTaskTracker = new ShardCompactionTaskTracker();
+    }
 
 
     @Override
-    public Observable<Integer> compact( final ShardEntryGroup group ) {
+    public Set<Shard> compact( final ApplicationScope scope, final DirectedEdgeMeta edgeMeta,
+                               final ShardEntryGroup group ) {
         final long startTime = timeService.getCurrentTime();
 
-        Preconditions.checkNotNull(group, "group cannot be null");
+        Preconditions.checkNotNull( group, "group cannot be null" );
         Preconditions.checkArgument( group.isCompactionPending(), "Compaction is pending" );
-        Preconditions.checkArgument( group.shouldCompact(startTime  ), "Compaction can now be run" );
+        Preconditions.checkArgument( group.shouldCompact( startTime ), "Compaction can now be run" );
+
+        /**
+         * It's already compacting, don't do anything
+         */
+        if (!shardCompactionTaskTracker.shouldStartCompaction( scope, edgeMeta, group )){
+            return Collections.emptySet();
+        }
 
 
         final Shard targetShard = group.getCompactionTarget();
@@ -64,12 +127,181 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
         final Collection<Shard> sourceShards = group.getReadShards();
 
 
-        //now get iterators for each of the source shards, and then copy them to the compaction target shard
+
+        Observable.create( new ObservableIterator<MarkedEdge>( "Shard_Repair" ) {
+            @Override
+            protected Iterator<MarkedEdge> getIterator() {
+                return edgeMeta.loadEdges( shardedEdgeSerialization, edgeColumnFamilies, scope, group.getReadShards(), Long.MAX_VALUE );
+            }
+        } ).buffer( graphFig.getScanPageSize() ).doOnNext( new Action1<List<MarkedEdge>>() {
+            @Override
+            public void call( final List<MarkedEdge> markedEdges ) {
+
+            }
+        }).doOnNext( new Action1<List<MarkedEdge>>() {
+            @Override
+            public void call( final List<MarkedEdge> markedEdges ) {
+
+            }
+        } );
+
 
 
 
 
         return null;
+    }
+
+
+    @Override
+    public AuditResult evaluateShardGroup( final ApplicationScope scope, final DirectedEdgeMeta edgeMeta,
+                                           final ShardEntryGroup group ) {
+
+
+        final double repairChance = random.nextDouble();
+
+        //don't repair
+        if ( repairChance > graphFig.getShardRepairChance() ) {
+            return AuditResult.NOT_CHECKED;
+        }
+
+
+        /**
+         * We don't have a compaction pending.  Run an audit on the shards
+         */
+        if ( !group.isCompactionPending() ) {
+
+            /**
+             * Check if we should allocate, we may want to
+             */
+
+
+            final boolean created = nodeShardAllocation.auditShard( scope, group, edgeMeta );
+
+
+            if ( !created ) {
+                return AuditResult.CHECKED_NO_OP;
+            }
+
+
+            return AuditResult.CHECKED_CREATED;
+        }
+
+        //check our taskmanager
+
+
+        /**
+         * Do the compaction
+         */
+        if ( group.shouldCompact( timeService.getCurrentTime() ) ) {
+            compact( scope, edgeMeta, group );
+            return AuditResult.COMPACTING;
+        }
+
+        //no op, there's nothing we need to do to this shard
+        return AuditResult.NOT_CHECKED;
+    }
+
+
+    private static final class ShardCompactionTaskTracker {
+        private BitSet runningTasks = new BitSet();
+
+
+        /**
+         * Sets this data into our scope to signal it's running to stop other threads from attempting to run
+         * @param scope
+         * @param edgeMeta
+         * @param group
+         * @return
+         */
+        public boolean shouldStartCompaction( final ApplicationScope scope, final DirectedEdgeMeta edgeMeta,
+                                     ShardEntryGroup group ) {
+            final int hash = doHash( scope, edgeMeta, group ).asInt();
+
+            if(runningTasks.get( hash )){
+                return false;
+            }
+
+            runningTasks.set( hash );
+
+            return true;
+        }
+
+
+        /**
+         * Mark this entry group as complete
+         * @param scope
+         * @param edgeMeta
+         * @param group
+         */
+        public void complete( final ApplicationScope scope, final DirectedEdgeMeta edgeMeta,
+                                             ShardEntryGroup group ) {
+            final int hash = doHash( scope, edgeMeta, group ).asInt();
+            runningTasks.clear( hash );
+        }
+
+
+        /**
+         * Hash our data into a consistent long
+         * @param scope
+         * @param directedEdgeMeta
+         * @param shardEntryGroup
+         * @return
+         */
+        private HashCode doHash( final ApplicationScope scope, final DirectedEdgeMeta directedEdgeMeta,
+                                 final ShardEntryGroup shardEntryGroup ) {
+
+            final Hasher hasher = MURMUR_128.newHasher();
+
+
+            addToHash( hasher, scope.getApplication() );
+
+            /**
+             * add our edge meta data
+             */
+            for ( DirectedEdgeMeta.NodeMeta nodeMeta : directedEdgeMeta.getNodes() ) {
+                addToHash( hasher, nodeMeta.getId() );
+                hasher.putInt( nodeMeta.getNodeType().getStorageValue() );
+            }
+
+
+            /**
+             * Add our edge type
+             */
+            for ( String type : directedEdgeMeta.getTypes() ) {
+                hasher.putString( type, CHARSET );
+            }
+
+            //add our compaction target to the hash
+            final Shard compactionTarget = shardEntryGroup.getCompactionTarget();
+
+            hasher.putLong( compactionTarget.getShardIndex() );
+
+
+            return hasher.hash();
+        }
+
+
+        private void addToHash( final Hasher hasher, final Id id ) {
+
+            final UUID nodeUuid = id.getUuid();
+            final String nodeType = id.getType();
+
+            hasher.putLong( nodeUuid.getMostSignificantBits() ).putLong( nodeUuid.getLeastSignificantBits() )
+                  .putString( nodeType, CHARSET );
+        }
+    }
+
+    private enum StartResult{
+        /**
+         * Returned if the compaction was started
+         */
+
+        STARTED,
 
+        /**
+         * Returned if we are running the compaction
+         */
+        RUNNING;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c69c1974/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardRowIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardRowIterator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardRowIterator.java
deleted file mode 100644
index 0bbb011..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardRowIterator.java
+++ /dev/null
@@ -1,134 +0,0 @@
-package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
-
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-import org.apache.usergrid.persistence.core.astyanax.ColumnNameIterator;
-import org.apache.usergrid.persistence.core.astyanax.MultiKeyColumnNameIterator;
-import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
-import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-
-import com.netflix.astyanax.Keyspace;
-import com.netflix.astyanax.model.ConsistencyLevel;
-import com.netflix.astyanax.query.RowQuery;
-import com.netflix.astyanax.util.RangeBuilder;
-
-
-/**
- * Internal iterator to iterate over multiple row keys
- *
- * @param <R> The row type
- * @param <C> The column type
- * @param <T> The parsed return type
- */
-public class ShardRowIterator<R, C, T> implements Iterator<T> {
-
-    private final EdgeSearcher<R, C, T> searcher;
-
-    private final MultiTennantColumnFamily<ApplicationScope, R, C> cf;
-
-    private Iterator<T> currentColumnIterator;
-
-    private final Keyspace keyspace;
-
-    private final int pageSize;
-
-    private final ConsistencyLevel consistencyLevel;
-
-
-    public ShardRowIterator( final EdgeSearcher<R, C, T> searcher,
-                             final MultiTennantColumnFamily<ApplicationScope, R, C> cf, final Keyspace keyspace,
-                             final ConsistencyLevel consistencyLevel, final int pageSize ) {
-        this.searcher = searcher;
-        this.cf = cf;
-        this.keyspace = keyspace;
-        this.pageSize = pageSize;
-        this.consistencyLevel = consistencyLevel;
-    }
-
-
-    @Override
-    public boolean hasNext() {
-        //we have more columns to return
-        if ( currentColumnIterator != null && currentColumnIterator.hasNext() ) {
-            return true;
-        }
-
-        /**
-         * We have another row key, advance to it and re-check
-         */
-        if ( searcher.hasNext() ) {
-            advanceRow();
-            return hasNext();
-        }
-
-        //we have no more columns, and no more row keys, we're done
-        return false;
-    }
-
-
-    @Override
-    public T next() {
-        if ( !hasNext() ) {
-            throw new NoSuchElementException( "There are no more rows or columns left to advance" );
-        }
-
-        return currentColumnIterator.next();
-    }
-
-
-    @Override
-    public void remove() {
-        throw new UnsupportedOperationException( "Remove is unsupported" );
-    }
-
-
-    /**
-     * Advance our iterator to the next row (assumes the check for row keys is elsewhere)
-     */
-    private void advanceRow() {
-
-        /**
-         * If the edge is present, we need to being seeking from this
-         */
-
-        final RangeBuilder rangeBuilder = new RangeBuilder().setLimit( pageSize );
-
-
-        //set the range into the search
-        searcher.setRange( rangeBuilder );
-
-        /**
-         * Get our list of slices
-         */
-        final List<ScopedRowKey<ApplicationScope, R>> rowKeys = searcher.next();
-
-
-        final List<ColumnNameIterator<C, T>> columnNameIterators = new ArrayList<>( rowKeys.size() );
-
-        for(ScopedRowKey<ApplicationScope, R> rowKey: rowKeys){
-
-
-
-           final  RowQuery<ScopedRowKey<ApplicationScope, R>, C> query =
-                    keyspace.prepareQuery( cf ).setConsistencyLevel( consistencyLevel ).getKey( rowKey )
-                            .autoPaginate( true ).withColumnRange( rangeBuilder.build() );
-
-
-            final ColumnNameIterator<C, T> columnNameIterator = new ColumnNameIterator<>( query, searcher, searcher.hasPage() );
-
-            columnNameIterators.add( columnNameIterator );
-
-        }
-
-
-
-        currentColumnIterator = new MultiKeyColumnNameIterator<>(columnNameIterators, searcher, pageSize);
-
-
-    }
-}


[2/3] Finished refactor of low level serialization to remove impedance mismatch of apis between read and write.

Posted by to...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c69c1974/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
index 6f7c5ce..030e4a7 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
@@ -21,6 +21,7 @@
 package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
 
 
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.UUID;
 
@@ -47,7 +48,6 @@ import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardS
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.RowKey;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.RowKeyType;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardedEdgeSerialization;
 import org.apache.usergrid.persistence.graph.serialization.util.GraphValidation;
 import org.apache.usergrid.persistence.model.entity.Id;
@@ -62,6 +62,10 @@ import com.netflix.astyanax.util.RangeBuilder;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 
+/**
+ * TODO: Rafactor this to use shards only, no shard groups, just collections of shards.  The parent caller can aggregate
+ * the results of multiple groups together, this has an impedance mismatch in the API layer.
+ */
 @Singleton
 public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
 
@@ -94,224 +98,254 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
 
 
     @Override
-    public MutationBatch writeEdge( final EdgeColumnFamilies columnFamilies, final ApplicationScope scope,
-                                    final MarkedEdge markedEdge, final UUID timestamp  ) {
+    public MutationBatch writeEdgeFromSource( final EdgeColumnFamilies columnFamilies, final ApplicationScope scope,
+                                              final MarkedEdge markedEdge, final Collection<Shard> shards,
+                                              final DirectedEdgeMeta directedEdgeMeta, final UUID timestamp ) {
         ValidationUtils.validateApplicationScope( scope );
         GraphValidation.validateEdge( markedEdge );
         ValidationUtils.verifyTimeUuid( timestamp, "timestamp" );
 
+        return new SourceWriteOp( columnFamilies, markedEdge ) {
 
-        final MutationBatch batch = keyspace.prepareMutationBatch().withConsistencyLevel( cassandraConfig.getWriteCL() )
-                                            .withTimestamp( timestamp.timestamp() );
-
-        final boolean isDeleted = markedEdge.isDeleted();
-
-
-        doWrite( columnFamilies, scope, markedEdge, new RowOp<RowKey>() {
             @Override
-            public void writeEdge( final MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> columnFamily,
-                                   final RowKey rowKey, final DirectedEdge edge ) {
-                batch.withRow( columnFamily, ScopedRowKey.fromKey( scope, rowKey ) ).putColumn( edge, isDeleted );
-            }
-
-
-            @Override
-            public void countEdge( final Shard shard, final DirectedEdgeMeta directedEdgeMeta ) {
-               if(!isDeleted) {
-                   writeEdgeShardStrategy.increment( scope, shard, 1,  directedEdgeMeta );
-               }
-            }
-
+            void writeEdge( final MutationBatch batch,
+                            final MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> columnFamily,
+                            final ApplicationScope scope, final RowKey rowKey, final DirectedEdge edge,
+                            final Shard shard, final boolean isDeleted ) {
 
+                batch.withRow( columnFamily, ScopedRowKey.fromKey( scope, rowKey ) ).putColumn( edge, isDeleted );
 
-            @Override
-            public void writeVersion( final MultiTennantColumnFamily<ApplicationScope, EdgeRowKey, Long> columnFamily,
-                                      final EdgeRowKey rowKey, final long timestamp ) {
-                batch.withRow( columnFamily, ScopedRowKey.fromKey( scope, rowKey ) ).putColumn( timestamp, isDeleted );
+                if ( !isDeleted ) {
+                    writeEdgeShardStrategy.increment( scope, shard, 1, directedEdgeMeta );
+                }
             }
-        } );
-
-
-        return batch;
+        }.createBatch( scope, shards, timestamp );
     }
 
 
     @Override
-    public MutationBatch deleteEdge( final EdgeColumnFamilies columnFamilies, final ApplicationScope scope,
-                                     final MarkedEdge markedEdge, final UUID timestamp ) {
+    public MutationBatch writeEdgeFromSourceWithTargetType( final EdgeColumnFamilies columnFamilies,
+                                                            final ApplicationScope scope, final MarkedEdge markedEdge,
+                                                            final Collection<Shard> shards,
+                                                            final DirectedEdgeMeta directedEdgeMeta,
+                                                            final UUID timestamp ) {
         ValidationUtils.validateApplicationScope( scope );
         GraphValidation.validateEdge( markedEdge );
         ValidationUtils.verifyTimeUuid( timestamp, "timestamp" );
 
 
-        final MutationBatch batch = keyspace.prepareMutationBatch().withConsistencyLevel( cassandraConfig.getWriteCL() )
-                                            .withTimestamp( timestamp.timestamp() );
+        return new SourceTargetTypeWriteOp( columnFamilies, markedEdge ) {
 
-
-        doWrite( columnFamilies, scope, markedEdge, new RowOp<RowKey>() {
             @Override
-            public void writeEdge( final MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> columnFamily,
-                                   final RowKey rowKey, final DirectedEdge edge ) {
-                batch.withRow( columnFamily, ScopedRowKey.fromKey( scope, rowKey ) ).deleteColumn( edge );
-            }
+            void writeEdge( final MutationBatch batch,
+                            final MultiTennantColumnFamily<ApplicationScope, RowKeyType, DirectedEdge> columnFamily,
+                            final ApplicationScope scope, final RowKeyType rowKey, final DirectedEdge edge,
+                            final Shard shard, final boolean isDeleted ) {
 
 
-            @Override
-            public void countEdge(  final Shard shard, final DirectedEdgeMeta directedEdgeMeta ) {
-                writeEdgeShardStrategy.increment( scope,  shard, -1, directedEdgeMeta );
-            }
+                batch.withRow( columnFamily, ScopedRowKey.fromKey( scope, rowKey ) ).putColumn( edge, isDeleted );
 
 
-            @Override
-            public void writeVersion( final MultiTennantColumnFamily<ApplicationScope, EdgeRowKey, Long> columnFamily,
-                                      final EdgeRowKey rowKey, final long timestamp ) {
-                batch.withRow( columnFamily, ScopedRowKey.fromKey( scope, rowKey ) ).deleteColumn( timestamp );
+                if ( !isDeleted ) {
+                    writeEdgeShardStrategy.increment( scope, shard, 1, directedEdgeMeta );
+                }
             }
-        } );
-
-
-        return batch;
+        }.createBatch( scope, shards, timestamp );
     }
 
 
-    /**
-     * EdgeWrite the edges internally
-     *
-     * @param scope The scope to encapsulate
-     * @param edge The edge to write
-     * @param op The row operation to invoke
-     */
-    private void doWrite( final EdgeColumnFamilies columnFamilies, final ApplicationScope scope, final MarkedEdge edge,
-                          final RowOp op ) {
+    @Override
+    public MutationBatch writeEdgeToTarget( final EdgeColumnFamilies columnFamilies, final ApplicationScope scope,
+                                            final MarkedEdge markedEdge, final Collection<Shard> shards,
+                                            final DirectedEdgeMeta targetEdgeMeta, final UUID timestamp ) {
         ValidationUtils.validateApplicationScope( scope );
-        GraphValidation.validateEdge( edge );
-
-        final Id sourceNodeId = edge.getSourceNode();
-        final String sourceNodeType = sourceNodeId.getType();
-        final Id targetNodeId = edge.getTargetNode();
-        final String targetNodeType = targetNodeId.getType();
-        final long timestamp = edge.getTimestamp();
-        final String type = edge.getType();
-
-
-        /**
-         * Key in the serializers based on the edge
-         */
-
+        GraphValidation.validateEdge( markedEdge );
+        ValidationUtils.verifyTimeUuid( timestamp, "timestamp" );
 
-        /**
-         * write edges from source->target
-         */
 
+        return new TargetWriteOp( columnFamilies, markedEdge ) {
 
-        final long time = timeService.getCurrentTime();
+            @Override
+            void writeEdge( final MutationBatch batch,
+                            final MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> columnFamily,
+                            final ApplicationScope scope, final RowKey rowKey, final DirectedEdge edge,
+                            final Shard shard, final boolean isDeleted ) {
 
-        final DirectedEdge sourceEdge = new DirectedEdge( targetNodeId, timestamp );
+                batch.withRow( columnFamily, ScopedRowKey.fromKey( scope, rowKey ) ).putColumn( edge, isDeleted );
 
-        final DirectedEdgeMeta sourceEdgeMeta =  DirectedEdgeMeta.fromSourceNode( sourceNodeId, type );
 
+                if ( !isDeleted ) {
+                    writeEdgeShardStrategy.increment( scope, shard, 1, targetEdgeMeta );
+                }
+            }
+        }.createBatch( scope, shards, timestamp );
+    }
 
 
-        final ShardEntryGroup sourceRowKeyShard =
-                writeEdgeShardStrategy.getWriteShards( scope, timestamp, sourceEdgeMeta );
+    @Override
+    public MutationBatch writeEdgeToTargetWithSourceType( final EdgeColumnFamilies columnFamilies,
+                                                          final ApplicationScope scope, final MarkedEdge markedEdge,
+                                                          final Collection<Shard> shards,
+                                                          final DirectedEdgeMeta directedEdgeMeta,
+                                                          final UUID timestamp ) {
+        ValidationUtils.validateApplicationScope( scope );
+        GraphValidation.validateEdge( markedEdge );
+        ValidationUtils.verifyTimeUuid( timestamp, "timestamp" );
 
-        final MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> sourceCf =
-                columnFamilies.getSourceNodeCfName();
 
+        return new TargetSourceTypeWriteOp( columnFamilies, markedEdge ) {
 
-        for ( Shard shard : sourceRowKeyShard.getWriteShards(time) ) {
+            @Override
+            void writeEdge( final MutationBatch batch,
+                            final MultiTennantColumnFamily<ApplicationScope, RowKeyType, DirectedEdge> columnFamily,
+                            final ApplicationScope scope, final RowKeyType rowKey, final DirectedEdge edge,
+                            final Shard shard, final boolean isDeleted ) {
 
-            final long shardId = shard.getShardIndex();
-            final RowKey sourceRowKey = new RowKey( sourceNodeId, type, shardId );
-            op.writeEdge( sourceCf, sourceRowKey, sourceEdge );
-            op.countEdge( shard, sourceEdgeMeta );
-        }
+                batch.withRow( columnFamilies.getTargetNodeSourceTypeCfName(), ScopedRowKey.fromKey( scope, rowKey ) )
+                     .putColumn( edge, isDeleted );
 
 
+                if ( !isDeleted ) {
+                    writeEdgeShardStrategy.increment( scope, shard, 1, directedEdgeMeta );
+                }
+            }
+        }.createBatch( scope, shards, timestamp );
+    }
 
-        final DirectedEdgeMeta sourceEdgeTargetTypeMeta =  DirectedEdgeMeta.fromSourceNodeTargetType( sourceNodeId,
-                type, targetNodeType );
 
+    @Override
+    public MutationBatch writeEdgeVersions( final EdgeColumnFamilies columnFamilies, final ApplicationScope scope,
+                                            final MarkedEdge markedEdge, final Collection<Shard> shards,
+                                            final DirectedEdgeMeta directedEdgeMeta, final UUID timestamp ) {
 
-        final ShardEntryGroup sourceWithTypeRowKeyShard = writeEdgeShardStrategy
-                .getWriteShards( scope, timestamp, sourceEdgeTargetTypeMeta );
+        ValidationUtils.validateApplicationScope( scope );
+        GraphValidation.validateEdge( markedEdge );
+        ValidationUtils.verifyTimeUuid( timestamp, "timestamp" );
 
-        final MultiTennantColumnFamily<ApplicationScope, RowKeyType, DirectedEdge> targetCf =
-                columnFamilies.getSourceNodeTargetTypeCfName();
 
-        for ( Shard shard : sourceWithTypeRowKeyShard.getWriteShards(time) ) {
+        return new EdgeVersions( columnFamilies, markedEdge ) {
 
-            final long shardId = shard.getShardIndex();
-            final RowKeyType sourceRowKeyType = new RowKeyType( sourceNodeId, type, targetNodeId, shardId );
+            @Override
+            void writeEdge( final MutationBatch batch,
+                            final MultiTennantColumnFamily<ApplicationScope, EdgeRowKey, Long> columnFamily,
+                            final ApplicationScope scope, final EdgeRowKey rowKey, final Long column, final Shard shard,
+                            final boolean isDeleted ) {
+                batch.withRow( columnFamilies.getGraphEdgeVersions(), ScopedRowKey.fromKey( scope, rowKey ) )
+                     .putColumn( column, isDeleted );
 
-            op.writeEdge( targetCf, sourceRowKeyType, sourceEdge );
-            op.countEdge( shard, sourceEdgeTargetTypeMeta );
-        }
 
+                if ( !isDeleted ) {
+                    writeEdgeShardStrategy.increment( scope, shard, 1, directedEdgeMeta );
+                }
+            }
+        }.createBatch( scope, shards, timestamp );
+    }
 
-        /**
-         * write edges from target<-source
-         */
 
-        final DirectedEdge targetEdge = new DirectedEdge( sourceNodeId, timestamp );
+    @Override
+    public MutationBatch deleteEdgeFromSource( final EdgeColumnFamilies columnFamilies, final ApplicationScope scope,
+                                               final MarkedEdge markedEdge, final Collection<Shard> shards,
+                                               final DirectedEdgeMeta directedEdgeMeta, final UUID timestamp ) {
 
-        final DirectedEdgeMeta targetEdgeMeta =  DirectedEdgeMeta.fromTargetNode( targetNodeId, type );
+        return new SourceWriteOp( columnFamilies, markedEdge ) {
 
+            @Override
+            void writeEdge( final MutationBatch batch,
+                            final MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> columnFamily,
+                            final ApplicationScope scope, final RowKey rowKey, final DirectedEdge edge,
+                            final Shard shard, final boolean isDeleted ) {
 
+                batch.withRow( columnFamily, ScopedRowKey.fromKey( scope, rowKey ) ).deleteColumn( edge );
+            }
+        }.createBatch( scope, shards, timestamp );
+    }
 
-        final ShardEntryGroup targetRowKeyShard =
-                writeEdgeShardStrategy.getWriteShards( scope, timestamp, targetEdgeMeta );
 
-        final MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> sourceByTargetCf =
-                columnFamilies.getTargetNodeCfName();
+    @Override
+    public MutationBatch deleteEdgeFromSourceWithTargetType( final EdgeColumnFamilies columnFamilies,
+                                                             final ApplicationScope scope, final MarkedEdge markedEdge,
+                                                             final Collection<Shard> shards,
+                                                             final DirectedEdgeMeta directedEdgeMeta,
+                                                             final UUID timestamp ) {
+        return new SourceTargetTypeWriteOp( columnFamilies, markedEdge ) {
 
-        for ( Shard shard : targetRowKeyShard.getWriteShards(time) ) {
-            final long shardId = shard.getShardIndex();
-            final RowKey targetRowKey = new RowKey( targetNodeId, type, shardId );
+            @Override
+            void writeEdge( final MutationBatch batch,
+                            final MultiTennantColumnFamily<ApplicationScope, RowKeyType, DirectedEdge> columnFamily,
+                            final ApplicationScope scope, final RowKeyType rowKey, final DirectedEdge edge,
+                            final Shard shard, final boolean isDeleted ) {
 
-            op.writeEdge( sourceByTargetCf, targetRowKey, targetEdge );
-            op.countEdge( shard, targetEdgeMeta );
-        }
 
+                batch.withRow( columnFamilies.getSourceNodeTargetTypeCfName(), ScopedRowKey.fromKey( scope, rowKey ) )
+                     .deleteColumn( edge );
+            }
+        }.createBatch( scope, shards, timestamp );
+    }
 
 
-        final DirectedEdgeMeta targetEdgeSourceTypeMeta =  DirectedEdgeMeta.fromTargetNodeSourceType( targetNodeId, type, sourceNodeType );
+    @Override
+    public MutationBatch deleteEdgeToTarget( final EdgeColumnFamilies columnFamilies, final ApplicationScope scope,
+                                             final MarkedEdge markedEdge, final Collection<Shard> shards,
+                                             final DirectedEdgeMeta directedEdgeMeta, final UUID timestamp ) {
 
+        return new TargetWriteOp( columnFamilies, markedEdge ) {
 
-        final ShardEntryGroup targetWithTypeRowKeyShard = writeEdgeShardStrategy
-                .getWriteShards( scope, timestamp, targetEdgeSourceTypeMeta );
+            @Override
+            void writeEdge( final MutationBatch batch,
+                            final MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> columnFamily,
+                            final ApplicationScope scope, final RowKey rowKey, final DirectedEdge edge,
+                            final Shard shard, final boolean isDeleted ) {
 
-        final MultiTennantColumnFamily<ApplicationScope, RowKeyType, DirectedEdge> targetBySourceCf =
-                columnFamilies.getTargetNodeSourceTypeCfName();
+                batch.withRow( columnFamily, ScopedRowKey.fromKey( scope, rowKey ) ).deleteColumn( edge );
+            }
+        }.createBatch( scope, shards, timestamp );
+    }
 
 
-        for ( Shard shard : targetWithTypeRowKeyShard.getWriteShards(time) ) {
+    @Override
+    public MutationBatch deleteEdgeToTargetWithSourceType( final EdgeColumnFamilies columnFamilies,
+                                                           final ApplicationScope scope, final MarkedEdge markedEdge,
+                                                           final Collection<Shard> shards,
+                                                           final DirectedEdgeMeta directedEdgeMeta,
+                                                           final UUID timestamp ) {
 
-            final long shardId = shard.getShardIndex();
+        return new TargetSourceTypeWriteOp( columnFamilies, markedEdge ) {
 
-            final RowKeyType targetRowKeyType = new RowKeyType( targetNodeId, type, sourceNodeId, shardId );
+            @Override
+            void writeEdge( final MutationBatch batch,
+                            final MultiTennantColumnFamily<ApplicationScope, RowKeyType, DirectedEdge> columnFamily,
+                            final ApplicationScope scope, final RowKeyType rowKey, final DirectedEdge edge,
+                            final Shard shard, final boolean isDeleted ) {
 
+                batch.withRow( columnFamilies.getTargetNodeSourceTypeCfName(), ScopedRowKey.fromKey( scope, rowKey ) )
+                     .deleteColumn( edge );
+            }
+        }.createBatch( scope, shards, timestamp );
+    }
 
-            op.writeEdge( targetBySourceCf, targetRowKeyType, targetEdge );
-            op.countEdge( shard, targetEdgeSourceTypeMeta );
-        }
 
-        /**
-         * Always a 0l shard, we're hard limiting 2b timestamps for the same edge
-         */
-        final EdgeRowKey edgeRowKey = new EdgeRowKey( sourceNodeId, type, targetNodeId, 0l );
+    @Override
+    public MutationBatch deleteEdgeVersions( final EdgeColumnFamilies columnFamilies, final ApplicationScope scope,
+                                             final MarkedEdge markedEdge, final Collection<Shard> shards,
+                                             final DirectedEdgeMeta directedEdgeMeta, final UUID timestamp ) {
 
+        return new EdgeVersions( columnFamilies, markedEdge ) {
 
-        /**
-         * Write this in the timestamp log for this edge of source->target
-         */
-        op.writeVersion( columnFamilies.getGraphEdgeVersions(), edgeRowKey, timestamp );
+            @Override
+            void writeEdge( final MutationBatch batch,
+                            final MultiTennantColumnFamily<ApplicationScope, EdgeRowKey, Long> columnFamily,
+                            final ApplicationScope scope, final EdgeRowKey rowKey, final Long column, final Shard shard,
+                            final boolean isDeleted ) {
+                batch.withRow( columnFamilies.getGraphEdgeVersions(), ScopedRowKey.fromKey( scope, rowKey ) )
+                     .deleteColumn( column );
+            }
+        }.createBatch( scope, shards, timestamp );
     }
 
 
     @Override
     public Iterator<MarkedEdge> getEdgeVersions( final EdgeColumnFamilies columnFamilies, final ApplicationScope scope,
-                                                 final SearchByEdge search, final Iterator<ShardEntryGroup> shards ) {
+                                                 final SearchByEdge search, final Collection<Shard> shards ) {
         ValidationUtils.validateApplicationScope( scope );
         GraphValidation.validateSearchByEdge( search );
 
@@ -370,7 +404,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
                     }
                 };
 
-        return new ShardRowIterator<>( searcher, columnFamily, keyspace, cassandraConfig.getReadCL(),
+        return new ShardsColumnIterator<>( searcher, columnFamily, keyspace, cassandraConfig.getReadCL(),
                 graphFig.getScanPageSize() );
     }
 
@@ -378,7 +412,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
     @Override
     public Iterator<MarkedEdge> getEdgesFromSource( final EdgeColumnFamilies columnFamilies,
                                                     final ApplicationScope scope, final SearchByEdgeType edgeType,
-                                                    final Iterator<ShardEntryGroup> shards ) {
+                                                    final Collection<Shard> shards ) {
 
         ValidationUtils.validateApplicationScope( scope );
         GraphValidation.validateSearchByEdgeType( edgeType );
@@ -392,7 +426,8 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
 
 
         final SourceEdgeSearcher<RowKey, DirectedEdge, MarkedEdge> searcher =
-                new SourceEdgeSearcher<RowKey, DirectedEdge, MarkedEdge>( scope, maxTimestamp, edgeType.last(), shards ) {
+                new SourceEdgeSearcher<RowKey, DirectedEdge, MarkedEdge>( scope, maxTimestamp, edgeType.last(),
+                        shards ) {
 
 
                     @Override
@@ -417,12 +452,10 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
                     protected MarkedEdge createEdge( final DirectedEdge edge, final boolean marked ) {
                         return new SimpleMarkedEdge( sourceId, type, edge.id, edge.timestamp, marked );
                     }
-
-
                 };
 
 
-        return new ShardRowIterator<>( searcher, columnFamily, keyspace, cassandraConfig.getReadCL(),
+        return new ShardsColumnIterator<>( searcher, columnFamily, keyspace, cassandraConfig.getReadCL(),
                 graphFig.getScanPageSize() );
     }
 
@@ -431,7 +464,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
     public Iterator<MarkedEdge> getEdgesFromSourceByTargetType( final EdgeColumnFamilies columnFamilies,
                                                                 final ApplicationScope scope,
                                                                 final SearchByIdType edgeType,
-                                                                final Iterator<ShardEntryGroup> shards ) {
+                                                                final Collection<Shard> shards ) {
 
         ValidationUtils.validateApplicationScope( scope );
         GraphValidation.validateSearchByEdgeType( edgeType );
@@ -446,7 +479,8 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
 
 
         final SourceEdgeSearcher<RowKeyType, DirectedEdge, MarkedEdge> searcher =
-                new SourceEdgeSearcher<RowKeyType, DirectedEdge, MarkedEdge>( scope, maxTimestamp, edgeType.last(), shards ) {
+                new SourceEdgeSearcher<RowKeyType, DirectedEdge, MarkedEdge>( scope, maxTimestamp, edgeType.last(),
+                        shards ) {
 
                     @Override
                     protected Serializer<DirectedEdge> getSerializer() {
@@ -470,22 +504,16 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
                     protected MarkedEdge createEdge( final DirectedEdge edge, final boolean marked ) {
                         return new SimpleMarkedEdge( targetId, type, edge.id, edge.timestamp, marked );
                     }
-
-
-
                 };
 
-        return new ShardRowIterator( searcher, columnFamily, keyspace, cassandraConfig.getReadCL(),
+        return new ShardsColumnIterator( searcher, columnFamily, keyspace, cassandraConfig.getReadCL(),
                 graphFig.getScanPageSize() );
     }
 
 
-
-
     @Override
     public Iterator<MarkedEdge> getEdgesToTarget( final EdgeColumnFamilies columnFamilies, final ApplicationScope scope,
-                                                  final SearchByEdgeType edgeType,
-                                                  final Iterator<ShardEntryGroup> shards ) {
+                                                  final SearchByEdgeType edgeType, final Collection<Shard> shards ) {
         ValidationUtils.validateApplicationScope( scope );
         GraphValidation.validateSearchByEdgeType( edgeType );
 
@@ -497,7 +525,8 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
         final Serializer<DirectedEdge> serializer = columnFamily.getColumnSerializer();
 
         final TargetEdgeSearcher<RowKey, DirectedEdge, MarkedEdge> searcher =
-                new TargetEdgeSearcher<RowKey, DirectedEdge, MarkedEdge>( scope, maxTimestamp, edgeType.last(), shards ) {
+                new TargetEdgeSearcher<RowKey, DirectedEdge, MarkedEdge>( scope, maxTimestamp, edgeType.last(),
+                        shards ) {
 
                     @Override
                     protected Serializer<DirectedEdge> getSerializer() {
@@ -524,7 +553,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
                 };
 
 
-        return new ShardRowIterator<>( searcher, columnFamily, keyspace, cassandraConfig.getReadCL(),
+        return new ShardsColumnIterator<>( searcher, columnFamily, keyspace, cassandraConfig.getReadCL(),
                 graphFig.getScanPageSize() );
     }
 
@@ -533,7 +562,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
     public Iterator<MarkedEdge> getEdgesToTargetBySourceType( final EdgeColumnFamilies columnFamilies,
                                                               final ApplicationScope scope,
                                                               final SearchByIdType edgeType,
-                                                              final Iterator<ShardEntryGroup> shards ) {
+                                                              final Collection<Shard> shards ) {
 
         ValidationUtils.validateApplicationScope( scope );
         GraphValidation.validateSearchByEdgeType( edgeType );
@@ -548,7 +577,8 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
 
 
         final TargetEdgeSearcher<RowKeyType, DirectedEdge, MarkedEdge> searcher =
-                new TargetEdgeSearcher<RowKeyType, DirectedEdge, MarkedEdge>( scope, maxTimestamp, edgeType.last(), shards ) {
+                new TargetEdgeSearcher<RowKeyType, DirectedEdge, MarkedEdge>( scope, maxTimestamp, edgeType.last(),
+                        shards ) {
                     @Override
                     protected Serializer<DirectedEdge> getSerializer() {
                         return serializer;
@@ -573,7 +603,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
                     }
                 };
 
-        return new ShardRowIterator<>( searcher, columnFamily, keyspace, cassandraConfig.getReadCL(),
+        return new ShardsColumnIterator<>( searcher, columnFamily, keyspace, cassandraConfig.getReadCL(),
                 graphFig.getScanPageSize() );
     }
 
@@ -581,74 +611,387 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
     /**
      * Class for performing searched on rows based on source id
      */
-    private static abstract class SourceEdgeSearcher<R, C, T extends Edge> extends EdgeSearcher<R, C, T>{
+    private static abstract class SourceEdgeSearcher<R, C, T extends Edge> extends EdgeSearcher<R, C, T> {
 
         protected SourceEdgeSearcher( final ApplicationScope scope, final long maxTimestamp, final Optional<Edge> last,
-                                      final Iterator<ShardEntryGroup> shards ) {
+                                      final Collection<Shard> shards ) {
             super( scope, maxTimestamp, last, shards );
         }
 
 
         public int compare( final T o1, final T o2 ) {
-            int compare = Long.compare(o1.getTimestamp(), o2.getTimestamp());
+            int compare = Long.compare( o1.getTimestamp(), o2.getTimestamp() );
 
-            if(compare == 0){
-                compare = o1.getTargetNode().compareTo( o2.getTargetNode());
+            if ( compare == 0 ) {
+                compare = o1.getTargetNode().compareTo( o2.getTargetNode() );
             }
 
             return compare;
         }
-
-
     }
 
 
     /**
      * Class for performing searched on rows based on target id
      */
-    private static abstract class TargetEdgeSearcher<R, C, T extends Edge> extends EdgeSearcher<R, C, T>{
+    private static abstract class TargetEdgeSearcher<R, C, T extends Edge> extends EdgeSearcher<R, C, T> {
 
         protected TargetEdgeSearcher( final ApplicationScope scope, final long maxTimestamp, final Optional<Edge> last,
-                                      final Iterator<ShardEntryGroup> shards ) {
+                                      final Collection<Shard> shards ) {
             super( scope, maxTimestamp, last, shards );
         }
 
 
         public int compare( final T o1, final T o2 ) {
-            int compare = Long.compare(o1.getTimestamp(), o2.getTimestamp());
+            int compare = Long.compare( o1.getTimestamp(), o2.getTimestamp() );
 
-            if(compare == 0){
-                compare = o1.getTargetNode().compareTo( o2.getTargetNode());
+            if ( compare == 0 ) {
+                compare = o1.getTargetNode().compareTo( o2.getTargetNode() );
             }
 
             return compare;
         }
-
-
     }
 
+
     /**
      * Simple callback to perform puts and deletes with a common row setup code
      *
      * @param <R> The row key type
+     * @param <C> The column type
      */
-    private static interface RowOp<R> {
+    private abstract class RowOp<R, C> {
+
+
+        /**
+         * Return the column family used for the write
+         */
+        protected abstract MultiTennantColumnFamily<ApplicationScope, R, C> getColumnFamily();
+
+        /**
+         * Get the row key
+         */
+        public abstract R getRowKey( final Shard shard );
+
+        /**
+         * Get the column family value
+         */
+        protected abstract C getDirectedEdge();
+
+        /**
+         * Get the flag on if it's deleted
+         */
+        protected abstract boolean isDeleted();
+
 
         /**
          * Write the edge with the given data
          */
-        void writeEdge( final MultiTennantColumnFamily<ApplicationScope, R, DirectedEdge> columnFamily, final  R rowKey,
-                        final DirectedEdge edge );
+        abstract void writeEdge( final MutationBatch batch,
+                                 final MultiTennantColumnFamily<ApplicationScope, R, C> columnFamily,
+                                 final ApplicationScope scope, final R rowKey, final C column, final Shard shard,
+                                 final boolean isDeleted );
+
+
+        /**
+         * Create a mutation batch
+         */
+        public MutationBatch createBatch( final ApplicationScope scope, final Collection<Shard> shards,
+                                          final UUID opTimestamp ) {
+
+            final MutationBatch batch =
+                    keyspace.prepareMutationBatch().withConsistencyLevel( cassandraConfig.getWriteCL() )
+                            .withTimestamp( opTimestamp.timestamp() );
+
+
+            final C column = getDirectedEdge();
+            final MultiTennantColumnFamily<ApplicationScope, R, C> columnFamily = getColumnFamily();
+            final boolean isDeleted = isDeleted();
+
+
+            for ( Shard shard : shards ) {
+                final R rowKey = getRowKey( shard );
+                writeEdge( batch, columnFamily, scope, rowKey, column, shard, isDeleted );
+            }
+
+
+            return batch;
+        }
+    }
+
+
+    /**
+     * Perform a write of the source->target
+     */
+    private abstract class SourceWriteOp extends RowOp<RowKey, DirectedEdge> {
+
+        private final MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> columnFamily;
+        private final Id sourceNodeId;
+
+
+        private final String type;
+        private final boolean isDeleted;
+        private final DirectedEdge directedEdge;
+
+
+        /**
+         * Write the source write operation
+         */
+        private SourceWriteOp( final EdgeColumnFamilies edgeColumnFamilies, final MarkedEdge markedEdge ) {
+            this.columnFamily = edgeColumnFamilies.getSourceNodeCfName();
+
+            this.sourceNodeId = markedEdge.getSourceNode();
+
+            this.type = markedEdge.getType();
+            this.isDeleted = markedEdge.isDeleted();
+
+            this.directedEdge = new DirectedEdge( markedEdge.getTargetNode(), markedEdge.getTimestamp() );
+        }
+
+
+        @Override
+        protected MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> getColumnFamily() {
+            return columnFamily;
+        }
+
+
+        @Override
+        public RowKey getRowKey( final Shard shard ) {
+            return new RowKey( sourceNodeId, type, shard.getShardIndex() );
+        }
+
+
+        @Override
+        protected DirectedEdge getDirectedEdge() {
+            return directedEdge;
+        }
+
+
+        @Override
+        protected boolean isDeleted() {
+            return isDeleted;
+        }
+    }
+
+
+    /**
+     * Perform a write of the source->target with target type
+     */
+    private abstract class SourceTargetTypeWriteOp extends RowOp<RowKeyType, DirectedEdge> {
+
+        private final MultiTennantColumnFamily<ApplicationScope, RowKeyType, DirectedEdge> columnFamily;
+        private final Id sourceNodeId;
+        private final String type;
+        private Id targetId;
+        private final boolean isDeleted;
+        private final DirectedEdge directedEdge;
+
+
+        /**
+         * Write the source write operation
+         */
+        private SourceTargetTypeWriteOp( final EdgeColumnFamilies edgeColumnFamilies, final MarkedEdge markedEdge ) {
+            this.columnFamily = edgeColumnFamilies.getSourceNodeTargetTypeCfName();
+
+            this.sourceNodeId = markedEdge.getSourceNode();
+
+            this.type = markedEdge.getType();
+            this.targetId = markedEdge.getTargetNode();
+            this.isDeleted = markedEdge.isDeleted();
+
+            this.directedEdge = new DirectedEdge( targetId, markedEdge.getTimestamp() );
+        }
+
+
+        @Override
+        protected MultiTennantColumnFamily<ApplicationScope, RowKeyType, DirectedEdge> getColumnFamily() {
+            return columnFamily;
+        }
+
+
+        @Override
+        public RowKeyType getRowKey( final Shard shard ) {
+            return new RowKeyType( sourceNodeId, type, targetId, shard.getShardIndex() );
+        }
+
+
+        @Override
+        protected DirectedEdge getDirectedEdge() {
+            return directedEdge;
+        }
+
+
+        @Override
+        protected boolean isDeleted() {
+            return isDeleted;
+        }
+    }
+
+
+    /**
+     * Perform a write of the target <-- source
+     */
+    private abstract class TargetWriteOp extends RowOp<RowKey, DirectedEdge> {
+
+        private final MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> columnFamily;
+        private final Id targetNode;
+
+
+        private final String type;
+        private final boolean isDeleted;
+        private final DirectedEdge directedEdge;
+
+
+        /**
+         * Write the source write operation
+         */
+        private TargetWriteOp( final EdgeColumnFamilies edgeColumnFamilies, final MarkedEdge markedEdge ) {
+            this.columnFamily = edgeColumnFamilies.getTargetNodeCfName();
+
+            this.targetNode = markedEdge.getTargetNode();
+
+            this.type = markedEdge.getType();
+            this.isDeleted = markedEdge.isDeleted();
+
+            this.directedEdge = new DirectedEdge( markedEdge.getSourceNode(), markedEdge.getTimestamp() );
+        }
+
+
+        @Override
+        protected MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> getColumnFamily() {
+            return columnFamily;
+        }
+
+
+        @Override
+        public RowKey getRowKey( final Shard shard ) {
+            return new RowKey( targetNode, type, shard.getShardIndex() );
+        }
+
+
+        @Override
+        protected DirectedEdge getDirectedEdge() {
+            return directedEdge;
+        }
+
+
+        @Override
+        protected boolean isDeleted() {
+            return isDeleted;
+        }
+    }
+
+
+    /**
+     * Perform a write of the target<--source with source type
+     */
+    private abstract class TargetSourceTypeWriteOp extends RowOp<RowKeyType, DirectedEdge> {
+
+        private final MultiTennantColumnFamily<ApplicationScope, RowKeyType, DirectedEdge> columnFamily;
+        private final Id targetNode;
+
+        private final Id sourceNode;
+
+        final String type;
+
+        final boolean isDeleted;
+        final DirectedEdge directedEdge;
+
 
         /**
-         * Perform the count on the edge
+         * Write the source write operation
          */
-        void countEdge( final Shard shard, final DirectedEdgeMeta directedEdgeMeta);
+        private TargetSourceTypeWriteOp( final EdgeColumnFamilies edgeColumnFamilies, final MarkedEdge markedEdge ) {
+            this.columnFamily = edgeColumnFamilies.getSourceNodeTargetTypeCfName();
+
+            this.targetNode = markedEdge.getTargetNode();
+            this.sourceNode = markedEdge.getSourceNode();
+
+            this.type = markedEdge.getType();
+            this.isDeleted = markedEdge.isDeleted();
+
+            this.directedEdge = new DirectedEdge( sourceNode, markedEdge.getTimestamp() );
+        }
+
+
+        @Override
+        protected MultiTennantColumnFamily<ApplicationScope, RowKeyType, DirectedEdge> getColumnFamily() {
+            return columnFamily;
+        }
+
+
+        @Override
+        public RowKeyType getRowKey( final Shard shard ) {
+            return new RowKeyType( targetNode, type, sourceNode, shard.getShardIndex() );
+        }
+
+
+        @Override
+        protected DirectedEdge getDirectedEdge() {
+            return directedEdge;
+        }
+
+
+        @Override
+        protected boolean isDeleted() {
+            return isDeleted;
+        }
+    }
+
+
+    /**
+     * Perform a write of the edge versions
+     */
+    private abstract class EdgeVersions extends RowOp<EdgeRowKey, Long> {
+
+        private final MultiTennantColumnFamily<ApplicationScope, EdgeRowKey, Long> columnFamily;
+        private final Id targetNode;
+
+        private final Id sourceNode;
+
+        final String type;
+
+        final boolean isDeleted;
+        final Long edgeVersion;
+
 
         /**
-         * Write the edge into the version cf
+         * Write the source write operation
          */
-        void writeVersion( final MultiTennantColumnFamily<ApplicationScope, EdgeRowKey, Long> columnFamily,
-                           final EdgeRowKey rowKey, long timestamp );
+        private EdgeVersions( final EdgeColumnFamilies edgeColumnFamilies, final MarkedEdge markedEdge ) {
+            this.columnFamily = edgeColumnFamilies.getGraphEdgeVersions();
+
+            this.targetNode = markedEdge.getTargetNode();
+            this.sourceNode = markedEdge.getSourceNode();
+
+            this.type = markedEdge.getType();
+            this.isDeleted = markedEdge.isDeleted();
+
+            this.edgeVersion = markedEdge.getTimestamp();
+        }
+
+
+        @Override
+        protected MultiTennantColumnFamily<ApplicationScope, EdgeRowKey, Long> getColumnFamily() {
+            return columnFamily;
+        }
+
+
+        @Override
+        public EdgeRowKey getRowKey( final Shard shard ) {
+            return new EdgeRowKey( sourceNode, type, targetNode, shard.getShardIndex() );
+        }
+
+
+        @Override
+        protected Long getDirectedEdge() {
+            return edgeVersion;
+        }
+
+
+        @Override
+        protected boolean isDeleted() {
+            return isDeleted;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c69c1974/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java
new file mode 100644
index 0000000..0c7e5b5
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java
@@ -0,0 +1,128 @@
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
+
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.usergrid.persistence.core.astyanax.ColumnNameIterator;
+import org.apache.usergrid.persistence.core.astyanax.MultiKeyColumnNameIterator;
+import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
+import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.model.ConsistencyLevel;
+import com.netflix.astyanax.query.RowQuery;
+import com.netflix.astyanax.util.RangeBuilder;
+
+
+/**
+ * Internal iterator to iterate over multiple row keys
+ *
+ * @param <R> The row type
+ * @param <C> The column type
+ * @param <T> The parsed return type
+ */
+public class ShardsColumnIterator<R, C, T> implements Iterator<T> {
+
+    private final EdgeSearcher<R, C, T> searcher;
+
+    private final MultiTennantColumnFamily<ApplicationScope, R, C> cf;
+
+    private Iterator<T> currentColumnIterator;
+
+    private final Keyspace keyspace;
+
+    private final int pageSize;
+
+    private final ConsistencyLevel consistencyLevel;
+
+
+    public ShardsColumnIterator( final EdgeSearcher<R, C, T> searcher,
+                             final MultiTennantColumnFamily<ApplicationScope, R, C> cf, final Keyspace keyspace,
+                             final ConsistencyLevel consistencyLevel, final int pageSize ) {
+        this.searcher = searcher;
+        this.cf = cf;
+        this.keyspace = keyspace;
+        this.pageSize = pageSize;
+        this.consistencyLevel = consistencyLevel;
+    }
+
+
+    @Override
+    public boolean hasNext() {
+
+        /**
+         * Iterator isn't initialized, start it
+         */
+        if(currentColumnIterator == null){
+            startIterator();
+        }
+
+        return currentColumnIterator.hasNext();
+    }
+
+
+    @Override
+    public T next() {
+        if ( !hasNext() ) {
+            throw new NoSuchElementException( "There are no more rows or columns left to advance" );
+        }
+
+        return currentColumnIterator.next();
+    }
+
+
+    @Override
+    public void remove() {
+        throw new UnsupportedOperationException( "Remove is unsupported" );
+    }
+
+
+    /**
+     * Advance our iterator to the next row (assumes the check for row keys is elsewhere)
+     */
+    private void startIterator() {
+
+        /**
+         * If the edge is present, we need to being seeking from this
+         */
+
+        final RangeBuilder rangeBuilder = new RangeBuilder().setLimit( pageSize );
+
+
+        //set the range into the search
+        searcher.setRange( rangeBuilder );
+
+        /**
+         * Get our list of slices
+         */
+        final List<ScopedRowKey<ApplicationScope, R>> rowKeys = searcher.getRowKeys();
+
+
+        final List<ColumnNameIterator<C, T>> columnNameIterators = new ArrayList<>( rowKeys.size() );
+
+        for(ScopedRowKey<ApplicationScope, R> rowKey: rowKeys){
+
+
+
+           final  RowQuery<ScopedRowKey<ApplicationScope, R>, C> query =
+                    keyspace.prepareQuery( cf ).setConsistencyLevel( consistencyLevel ).getKey( rowKey )
+                            .autoPaginate( true ).withColumnRange( rangeBuilder.build() );
+
+
+            final ColumnNameIterator<C, T> columnNameIterator = new ColumnNameIterator<>( query, searcher, searcher.hasPage() );
+
+            columnNameIterators.add( columnNameIterator );
+
+        }
+
+
+
+        currentColumnIterator = new MultiKeyColumnNameIterator<>(columnNameIterators, searcher, pageSize);
+
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c69c1974/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
index 8f5171b..59bf014 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
@@ -21,32 +21,29 @@
 package org.apache.usergrid.persistence.graph;
 
 
-import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
+import org.junit.After;
 import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Ignore;
 import org.junit.Test;
-import org.junit.runners.model.InitializationError;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.slf4j.Marker;
 
-import org.apache.cassandra.thrift.Cassandra;
 import org.apache.commons.lang.time.StopWatch;
 
 import org.apache.usergrid.persistence.core.cassandra.CassandraRule;
-import org.apache.usergrid.persistence.core.cassandra.ITRunner;
-import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
 import org.apache.usergrid.persistence.core.migration.MigrationException;
 import org.apache.usergrid.persistence.core.migration.MigrationManager;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
@@ -55,16 +52,20 @@ import org.apache.usergrid.persistence.graph.guice.TestGraphModule;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardAllocation;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardCache;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
 import org.apache.usergrid.persistence.model.entity.Id;
 
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Slf4jReporter;
 import com.google.common.base.Optional;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
+import com.netflix.config.ConfigurationManager;
 
 import rx.Observable;
-import rx.Subscriber;
 
 import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createEdge;
 import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createId;
@@ -80,23 +81,68 @@ public class GraphManagerShardConsistencyIT {
     @ClassRule
     public static CassandraRule rule = new CassandraRule();
 
+    private static final MetricRegistry registry = new MetricRegistry();
+
+    private static final Meter writeMeter = registry.meter( "writeThroughput" );
+
+    private static final Slf4jReporter reporter = Slf4jReporter.forRegistry( registry )
+                                                .outputTo( log )
+                                                .convertRatesTo( TimeUnit.SECONDS )
+                                                .convertDurationsTo( TimeUnit.MILLISECONDS )
+                                                .build();
+
+
 
     protected ApplicationScope scope;
 
-    protected int numWorkers;
+
+    protected Object originalShardSize;
+
+    protected Object originalShardTimeout;
+
+    protected Object originalShardDelta;
 
     @Before
     public void setupOrg() {
 
 
+        originalShardSize = ConfigurationManager.getConfigInstance().getProperty( GraphFig.SHARD_SIZE );
+
+        originalShardTimeout = ConfigurationManager.getConfigInstance().getProperty( GraphFig.SHARD_CACHE_TIMEOUT );
+
+        originalShardDelta =  ConfigurationManager.getConfigInstance().getProperty( GraphFig.SHARD_MIN_DELTA );
+
+
+        ConfigurationManager.getConfigInstance().setProperty( GraphFig.SHARD_SIZE, 10000 );
+
+
+        final long cacheTimeout = 10000;
+        //set our cache timeout to 10 seconds
+        ConfigurationManager.getConfigInstance().setProperty( GraphFig.SHARD_CACHE_TIMEOUT, cacheTimeout );
+
+
+        final long minDelta = ( long ) (cacheTimeout * 2.5);
+
+        ConfigurationManager.getConfigInstance().setProperty( GraphFig.SHARD_MIN_DELTA, minDelta );
+
+
+
 
         //get the system property of the UUID to use.  If one is not set, use the defualt
         String uuidString = System.getProperty( "org.id", "80a42760-b699-11e3-a5e2-0800200c9a66" );
 
         scope = new ApplicationScopeImpl( createId( UUID.fromString( uuidString ), "test" ) );
 
-        numWorkers = Integer.parseInt( System.getProperty( "numWorkers", "4" ) );
-//        readCount = Integer.parseInt( System.getProperty( "readCount", "20000" ) );
+
+
+        reporter.start( 10, TimeUnit.SECONDS );
+    }
+
+
+    @After
+    public void tearDown(){
+        reporter.stop();
+        reporter.report();
     }
 
 
@@ -131,7 +177,7 @@ public class GraphManagerShardConsistencyIT {
     @Test
     public void writeThousandsSingleTarget() throws InterruptedException, ExecutionException, MigrationException {
 
-        final Id targetId = createId("target");
+        final Id sourceId = createId("source");
         final String edgeType = "test";
 
         EdgeGenerator generator = new EdgeGenerator() {
@@ -139,7 +185,7 @@ public class GraphManagerShardConsistencyIT {
 
             @Override
             public Edge newEdge() {
-                Edge edge = createEdge( createId( "source" ), edgeType,  targetId );
+                Edge edge = createEdge( sourceId, edgeType,  createId( "target" ) );
 
 
                 return edge;
@@ -148,33 +194,45 @@ public class GraphManagerShardConsistencyIT {
 
             @Override
             public Observable<Edge> doSearch( final GraphManager manager ) {
-                return manager.loadEdgesToTarget( new SimpleSearchByEdgeType( targetId, "test", System.currentTimeMillis(), null ) );
+                return manager.loadEdgesFromSource( new SimpleSearchByEdgeType( sourceId, "test", System.currentTimeMillis(), null ) );
             }
         };
 
 
+        final int numInjectors = 2;
+
         /**
          * create 3 injectors.  This way all the caches are independent of one another.  This is the same as
          * multiple nodes
          */
-        final List<Injector> injectors = createInjectors(3);
+        final List<Injector> injectors = createInjectors(numInjectors);
 
 
         final GraphFig graphFig = getInstance( injectors, GraphFig.class );
 
         final long shardSize =  graphFig.getShardSize();
 
+
+        //we don't want to starve the cass runtime since it will be on the same box. Only take 50% of processing power for writes
+        final int numProcessors = Runtime.getRuntime().availableProcessors() /2 ;
+
+        final int numWorkers = numProcessors/numInjectors;
+
+
         /**
          * Do 4x shard size so we should have approximately 4 shards
          */
         final long numberOfEdges =  shardSize * 4;
 
+
         final long countPerWorker = numberOfEdges/numWorkers;
 
         final long writeLimit = countPerWorker;
 
 
-//        HystrixCassandra.ASYNC_GROUP.
+
+        //min stop time the min delta + 1 cache cycle timeout
+        final long minExecutionTime = graphFig.getShardMinDelta() + graphFig.getShardCacheTimeout();
 
 
 
@@ -185,7 +243,7 @@ public class GraphManagerShardConsistencyIT {
         for(Injector injector: injectors) {
             final GraphManagerFactory gmf = injector.getInstance( GraphManagerFactory.class );
 
-            futures.addAll( doTest( gmf, generator, writeLimit ) );
+            futures.addAll( doTest( gmf, generator, numWorkers,  writeLimit, minExecutionTime ) );
         }
 
         for(Future<Boolean> future: futures){
@@ -193,18 +251,20 @@ public class GraphManagerShardConsistencyIT {
         }
 
         //now get all our shards
-        final NodeShardAllocation allocation = getInstance( injectors, NodeShardAllocation.class );
+        final NodeShardCache cache = getInstance( injectors, NodeShardCache.class );
 
-        final DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromTargetNode( targetId, edgeType );
+        final DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromSourceNode( sourceId, edgeType );
 
         int count = 0;
 
-        while(count < 4) {
+        while(true) {
 
             //reset our count.  Ultimately we'll have 4 groups once our compaction completes
             count = 0;
 
-            final Iterator<ShardEntryGroup> groups = allocation.getShards( scope, Optional.<Shard>absent(), directedEdgeMeta );
+
+            //we have to get it from the cache, because this will trigger the compaction process
+            final Iterator<ShardEntryGroup> groups = cache.getReadShardGroup( scope, Long.MAX_VALUE, directedEdgeMeta );
 
             while(groups.hasNext()){
                 final ShardEntryGroup group = groups.next();
@@ -214,6 +274,13 @@ public class GraphManagerShardConsistencyIT {
                 count++;
 
             }
+
+            //we're done
+            if(count == 4){
+                break;
+            }
+
+            Thread.sleep(5000);
         }
 
 
@@ -252,14 +319,14 @@ public class GraphManagerShardConsistencyIT {
     /**
      * Execute the test with the generator
      */
-    private List<Future<Boolean>> doTest(final GraphManagerFactory factory, final EdgeGenerator generator, final long writeCount ) throws InterruptedException, ExecutionException {
+    private List<Future<Boolean>> doTest(final GraphManagerFactory factory, final EdgeGenerator generator, final int numWorkers, final long writeCount, final long minExecutionTime ) throws InterruptedException, ExecutionException {
 
         ExecutorService executor = Executors.newFixedThreadPool( numWorkers );
 
         List<Future<Boolean>> futures = new ArrayList<>( numWorkers );
 
         for ( int i = 0; i < numWorkers; i++ ) {
-            Future<Boolean> future = executor.submit( new Worker(factory, generator, writeCount ) );
+            Future<Boolean> future = executor.submit( new Worker(factory, generator, writeCount, minExecutionTime ) );
 
             futures.add( future );
         }
@@ -273,12 +340,14 @@ public class GraphManagerShardConsistencyIT {
         private final GraphManagerFactory factory;
         private final EdgeGenerator generator;
         private final long writeLimit;
+        private final long minExecutionTime;
 
 
-        private Worker( final GraphManagerFactory factory, final EdgeGenerator generator, final long writeLimit) {
+        private Worker( final GraphManagerFactory factory, final EdgeGenerator generator, final long writeLimit, final long minExecutionTime ) {
             this.factory = factory;
             this.generator = generator;
             this.writeLimit = writeLimit;
+            this.minExecutionTime = minExecutionTime;
         }
 
 
@@ -287,10 +356,11 @@ public class GraphManagerShardConsistencyIT {
             GraphManager manager = factory.createEdgeManager( scope );
 
 
-            final StopWatch timer = new StopWatch();
-            timer.start();
 
-            for ( long i = 0; i < writeLimit; i++ ) {
+            final long startTime = System.currentTimeMillis();
+
+
+            for ( long i = 0; i < writeLimit || System.currentTimeMillis() - startTime < minExecutionTime ; i++ ) {
 
                 Edge edge = generator.newEdge();
 
@@ -300,13 +370,13 @@ public class GraphManagerShardConsistencyIT {
                 assertNotNull( "Returned has a version", returned.getTimestamp() );
 
 
+                writeMeter.mark();
+
                 if ( i % 1000 == 0 ) {
                     log.info( "   Wrote: " + i );
                 }
             }
 
-            timer.stop();
-            log.info( "Total time to write {} entries {} ms", writeLimit, timer.getTime() );
 
             return true;
         }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c69c1974/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
index e4c7627..878674a 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
@@ -260,7 +260,7 @@ public class NodeShardAllocationTest {
         //mock up returning the value
         when( shardedEdgeSerialization
                 .getEdgesFromSourceByTargetType( same( edgeColumnFamilies ), same( scope ), any( SearchByIdType.class ),
-                        any( Iterator.class ) ) ).thenReturn( edgeIterator );
+                        any( Collection.class ) ) ).thenReturn( edgeIterator );
 
 
         final boolean result = approximation.auditShard( scope, shardEntryGroup, targetEdgeMeta );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c69c1974/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java
index 498d5d1..8e9ed5c 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java
@@ -453,6 +453,12 @@ public class NodeShardApproximationTest {
 
 
         @Override
+        public double getShardRepairChance() {
+            return 0;
+        }
+
+
+        @Override
         public long getShardSize() {
             return 0;  //To change body of implemented methods use File | Settings | File Templates.
         }
@@ -477,6 +483,12 @@ public class NodeShardApproximationTest {
 
 
         @Override
+        public int getShardCacheRefreshWorkerCount() {
+            return 0;
+        }
+
+
+        @Override
         public long getCounterFlushCount() {
             return 100000l;
         }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c69c1974/stack/corepersistence/graph/src/test/resources/usergrid-SHARD.properties
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/resources/usergrid-SHARD.properties b/stack/corepersistence/graph/src/test/resources/usergrid-SHARD.properties
new file mode 100644
index 0000000..6097446
--- /dev/null
+++ b/stack/corepersistence/graph/src/test/resources/usergrid-SHARD.properties
@@ -0,0 +1,23 @@
+#
+# /*
+#  * Licensed to the Apache Software Foundation (ASF) under one
+#  * or more contributor license agreements.  See the NOTICE file
+#  * distributed with this work for additional information
+#  * regarding copyright ownership.  The ASF licenses this file
+#  * to you under the Apache License, Version 2.0 (the
+#  * "License"); you may not use this file except in compliance
+#  * with the License.  You may obtain a copy of the License at
+#  *
+#  *    http://www.apache.org/licenses/LICENSE-2.0
+#  *
+#  * Unless required by applicable law or agreed to in writing,
+#  * software distributed under the License is distributed on an
+#  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  * KIND, either express or implied.  See the License for the
+#  * specific language governing permissions and limitations
+#  * under the License.
+#  */
+#
+
+# Keep nothing but overriding test defaults in here
+usergrid.graph.shard.size=10000