You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2014/08/18 21:42:14 UTC

[3/3] git commit: Finished refactor of low level serialization to remove impedance mismatch of apis between read and write.

Finished refactor of low level serialization to remove impedance mismatch of apis between read and write.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/c69c1974
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/c69c1974
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/c69c1974

Branch: refs/heads/USERGRID-188
Commit: c69c1974ade1a0a580ccf633f6a1cb8dc14dd1f0
Parents: cef3de9
Author: Todd Nine <to...@apache.org>
Authored: Fri Aug 15 16:49:25 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Mon Aug 18 13:41:31 2014 -0600

----------------------------------------------------------------------
 .../core/astyanax/ColumnNameIterator.java       |  24 +-
 .../core/hystrix/HystrixCassandra.java          |  10 +-
 stack/corepersistence/graph/pom.xml             |   7 +
 .../usergrid/persistence/graph/GraphFig.java    |  13 +
 .../graph/impl/CollectionIndexObserver.java     |   2 +-
 .../impl/EdgeSerializationImpl.java             | 256 ++++++-
 .../impl/shard/DirectedEdgeMeta.java            |  65 +-
 .../impl/shard/ShardEntryGroup.java             |  16 +-
 .../impl/shard/ShardGroupCompaction.java        |  39 +-
 .../impl/shard/ShardedEdgeSerialization.java    |  91 ++-
 .../NodeShardCounterSerializationImpl.java      |   2 +-
 .../impl/shard/impl/EdgeSearcher.java           |  32 +-
 .../shard/impl/NodeShardAllocationImpl.java     |  17 +-
 .../impl/shard/impl/NodeShardCacheImpl.java     | 127 ++--
 .../shard/impl/ShardGroupColumnIterator.java    | 130 ++++
 .../shard/impl/ShardGroupCompactionImpl.java    | 242 ++++++-
 .../impl/shard/impl/ShardRowIterator.java       | 134 ----
 .../impl/ShardedEdgeSerializationImpl.java      | 707 ++++++++++++++-----
 .../impl/shard/impl/ShardsColumnIterator.java   | 128 ++++
 .../graph/GraphManagerShardConsistencyIT.java   | 126 +++-
 .../impl/shard/NodeShardAllocationTest.java     |   2 +-
 .../shard/count/NodeShardApproximationTest.java |  12 +
 .../test/resources/usergrid-SHARD.properties    |  23 +
 23 files changed, 1682 insertions(+), 523 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c69c1974/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIterator.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIterator.java
index 050157a..af4e1f9 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIterator.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIterator.java
@@ -40,20 +40,15 @@ public class ColumnNameIterator<C, T> implements Iterable<T>, Iterator<T> {
 
     private final RowQuery<?, C> rowQuery;
     private final ColumnParser<C, T> parser;
+    private final boolean skipFirst;
 
     private Iterator<Column<C>> sourceIterator;
 
 
-    public ColumnNameIterator( RowQuery<?, C> rowQuery, final ColumnParser<C, T> parser, final boolean skipFirst) {
+    public ColumnNameIterator( RowQuery<?, C> rowQuery, final ColumnParser<C, T> parser, final boolean skipFirst ) {
         this.rowQuery = rowQuery.autoPaginate( true );
         this.parser = parser;
-
-        advanceIterator();
-
-        //if we are to skip the first element, we need to advance the iterator
-        if ( skipFirst && sourceIterator.hasNext() ) {
-            sourceIterator.next();
-        }
+        this.skipFirst = skipFirst;
     }
 
 
@@ -65,6 +60,19 @@ public class ColumnNameIterator<C, T> implements Iterable<T>, Iterator<T> {
 
     @Override
     public boolean hasNext() {
+
+        if ( sourceIterator == null ) {
+            advanceIterator();
+
+
+            //if we are to skip the first element, we need to advance the iterator
+            if ( skipFirst && sourceIterator.hasNext() ) {
+                sourceIterator.next();
+            }
+
+            return sourceIterator.hasNext();
+        }
+
         //if we've exhausted this iterator, try to advance to the next set
         if ( sourceIterator.hasNext() ) {
             return true;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c69c1974/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/hystrix/HystrixCassandra.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/hystrix/HystrixCassandra.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/hystrix/HystrixCassandra.java
index 7758651..356850e 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/hystrix/HystrixCassandra.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/hystrix/HystrixCassandra.java
@@ -42,20 +42,22 @@ public class HystrixCassandra {
     /**
      * Command group used for realtime user commands
      */
-    public static final HystrixCommandGroupKey USER_GROUP = HystrixCommandGroupKey.Factory.asKey( "user" );
+    public static final HystrixCommand.Setter
+            USER_GROUP = HystrixCommand.Setter.withGroupKey(   HystrixCommandGroupKey.Factory.asKey( "user" ) ).andThreadPoolPropertiesDefaults(
+            HystrixThreadPoolProperties.Setter().withCoreSize( 100 ) );
 
     /**
      * Command group for asynchronous operations
      */
-    public static final HystrixCommandGroupKey ASYNC_GROUP = HystrixCommandGroupKey.Factory.asKey( "async" );
+    public static final HystrixCommand.Setter
+            ASYNC_GROUP = HystrixCommand.Setter.withGroupKey( HystrixCommandGroupKey.Factory.asKey( "async" ) ).andThreadPoolPropertiesDefaults(
+            HystrixThreadPoolProperties.Setter().withCoreSize( 50 ) );
 
 
     /**
      * Execute an user operation
      */
     public static <R> OperationResult<R> user( final Execution<R> execution) {
-
-
         return new HystrixCommand<OperationResult<R>>( USER_GROUP ) {
 
             @Override

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c69c1974/stack/corepersistence/graph/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/pom.xml b/stack/corepersistence/graph/pom.xml
index eee82fd..8174a56 100644
--- a/stack/corepersistence/graph/pom.xml
+++ b/stack/corepersistence/graph/pom.xml
@@ -72,6 +72,13 @@
       <scope>test</scope>
     </dependency>
 
+    <dependency>
+        <groupId>com.codahale.metrics</groupId>
+        <artifactId>metrics-core</artifactId>
+        <version>3.0.2</version>
+        <scope>test</scope>
+    </dependency>
+
 
 
   </dependencies>

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c69c1974/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
index 45064c9..d434db7 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
@@ -45,6 +45,12 @@ public interface GraphFig extends GuicyFig {
      */
     public static final String SHARD_CACHE_TIMEOUT = "usergrid.graph.shard.cache.timeout";
 
+    public static final String SHARD_CACHE_REFRESH_WORKERS = "usergrid.graph.shard.refresh.worker.count";
+
+    public static final String SHARD_REPAIR_CHANCE = "usergrid.graph.shard.repair.chance";
+
+
+
     /**
      * The minimum amount of time than can occur (in millis) between shard allocation.  Must be at least 2x the cache timeout.
      *
@@ -74,6 +80,10 @@ public interface GraphFig extends GuicyFig {
     int getRepairConcurrentSize();
 
 
+    @Default( ".10" )
+    @Key( SHARD_REPAIR_CHANCE  )
+    double getShardRepairChance();
+
 
     @Default("500000")
     @Key(SHARD_SIZE)
@@ -94,6 +104,9 @@ public interface GraphFig extends GuicyFig {
     long getShardCacheSize();
 
 
+    @Default( "2" )
+    @Key( SHARD_CACHE_REFRESH_WORKERS )
+    int getShardCacheRefreshWorkerCount();
 
 
     @Default( "10000" )

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c69c1974/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/CollectionIndexObserver.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/CollectionIndexObserver.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/CollectionIndexObserver.java
index a4a7a70..d792463 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/CollectionIndexObserver.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/CollectionIndexObserver.java
@@ -61,7 +61,7 @@ public class CollectionIndexObserver{
 //
 //        //entity exists, write the edge
 //        if(entity.getEntity().isPresent()){
-//            em.writeEdge( edge ).toBlocking().last();
+//            em.writeEdgeFromSource( edge ).toBlocking().last();
 //        }
 //        //entity does not exist, it's been removed, mark the edge
 //        else{

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c69c1974/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
index d3f29a0..4c1ae79 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
@@ -20,12 +20,14 @@
 package org.apache.usergrid.persistence.graph.serialization.impl;
 
 
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.UUID;
 
 import javax.inject.Inject;
 
 import org.apache.usergrid.persistence.core.astyanax.CassandraConfig;
+import org.apache.usergrid.persistence.core.consistency.TimeService;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.util.ValidationUtils;
 import org.apache.usergrid.persistence.graph.GraphFig;
@@ -37,8 +39,10 @@ import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeColumnFamilies;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardStrategy;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardedEdgeSerialization;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardGroupColumnIterator;
 import org.apache.usergrid.persistence.graph.serialization.util.GraphValidation;
 import org.apache.usergrid.persistence.model.entity.Id;
 
@@ -53,7 +57,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
  * Serialization for edges.  Delegates partitioning to the sharding strategy.
  */
 @Singleton
-public class EdgeSerializationImpl implements EdgeSerialization  {
+public class EdgeSerializationImpl implements EdgeSerialization {
 
 
     protected final Keyspace keyspace;
@@ -62,18 +66,23 @@ public class EdgeSerializationImpl implements EdgeSerialization  {
     protected final EdgeShardStrategy edgeShardStrategy;
     protected final EdgeColumnFamilies edgeColumnFamilies;
     protected final ShardedEdgeSerialization shardedEdgeSerialization;
+    protected final TimeService timeService;
 
 
     @Inject
     public EdgeSerializationImpl( final Keyspace keyspace, final CassandraConfig cassandraConfig,
                                   final GraphFig graphFig, final EdgeShardStrategy edgeShardStrategy,
                                   final EdgeColumnFamilies edgeColumnFamilies,
-                                  final ShardedEdgeSerialization shardedEdgeSerialization ) {
+                                  final ShardedEdgeSerialization shardedEdgeSerialization,
+                                  final TimeService timeService ) {
 
 
-        checkNotNull( "keyspace required", keyspace );
-        checkNotNull( "cassandraConfig required", cassandraConfig );
-        checkNotNull( "consistencyFig required", graphFig );
+        checkNotNull( keyspace, "keyspace required" );
+        checkNotNull( cassandraConfig, "cassandraConfig required" );
+        checkNotNull( edgeShardStrategy, "edgeShardStrategy required" );
+        checkNotNull( edgeColumnFamilies, "edgeColumnFamilies required" );
+        checkNotNull( shardedEdgeSerialization, "shardedEdgeSerialization required" );
+        checkNotNull( timeService, "timeService required" );
 
 
         this.keyspace = keyspace;
@@ -82,18 +91,186 @@ public class EdgeSerializationImpl implements EdgeSerialization  {
         this.edgeShardStrategy = edgeShardStrategy;
         this.edgeColumnFamilies = edgeColumnFamilies;
         this.shardedEdgeSerialization = shardedEdgeSerialization;
+        this.timeService = timeService;
     }
 
 
     @Override
     public MutationBatch writeEdge( final ApplicationScope scope, final MarkedEdge markedEdge, final UUID timestamp ) {
-        return shardedEdgeSerialization.writeEdge( edgeColumnFamilies, scope, markedEdge, timestamp );
+
+        ValidationUtils.validateApplicationScope( scope );
+        GraphValidation.validateEdge( markedEdge );
+        ValidationUtils.verifyTimeUuid( timestamp, "timestamp" );
+
+        final long now = timeService.getCurrentTime();
+        final Id sourceNode = markedEdge.getSourceNode();
+        final Id targetNode = markedEdge.getTargetNode();
+        final String edgeType = markedEdge.getType();
+        final long edgeTimestamp = markedEdge.getTimestamp();
+
+        /**
+         * Source write
+         */
+        final DirectedEdgeMeta sourceEdgeMeta = DirectedEdgeMeta.fromSourceNode( sourceNode, edgeType );
+
+        final Collection<Shard> sourceWriteShards =
+                edgeShardStrategy.getWriteShards( scope, edgeTimestamp, sourceEdgeMeta ).getWriteShards( now );
+
+        final MutationBatch batch = shardedEdgeSerialization
+                .writeEdgeFromSource( edgeColumnFamilies, scope, markedEdge, sourceWriteShards, sourceEdgeMeta,
+                        timestamp );
+
+
+        /**
+         * Source with target  type write
+         */
+        final DirectedEdgeMeta sourceTargetTypeEdgeMeta =
+                DirectedEdgeMeta.fromSourceNodeTargetType( sourceNode, edgeType, targetNode.getType() );
+
+        final Collection<Shard> sourceTargetTypeWriteShards =
+                edgeShardStrategy.getWriteShards( scope, edgeTimestamp, sourceTargetTypeEdgeMeta )
+                                 .getWriteShards( now );
+
+        batch.mergeShallow( shardedEdgeSerialization
+                .writeEdgeFromSourceWithTargetType( edgeColumnFamilies, scope, markedEdge, sourceTargetTypeWriteShards,
+                        sourceTargetTypeEdgeMeta, timestamp ) );
+
+
+        /**
+         * Target write
+         *
+         */
+
+        final DirectedEdgeMeta targetEdgeMeta = DirectedEdgeMeta.fromTargetNode( targetNode, edgeType );
+
+        final Collection<Shard> targetWriteShards =
+                edgeShardStrategy.getWriteShards( scope, edgeTimestamp, targetEdgeMeta ).getWriteShards( now );
+
+        batch.mergeShallow( shardedEdgeSerialization
+                .writeEdgeToTarget( edgeColumnFamilies, scope, markedEdge, targetWriteShards, targetEdgeMeta,
+                        timestamp ) );
+
+
+        /**
+         * Target with source type write
+         */
+
+        final DirectedEdgeMeta targetSourceTypeEdgeMeta =
+                DirectedEdgeMeta.fromTargetNodeSourceType( targetNode, edgeType, sourceNode.getType() );
+
+        final Collection<Shard> targetSourceTypeWriteShards =
+                edgeShardStrategy.getWriteShards( scope, edgeTimestamp, targetSourceTypeEdgeMeta )
+                                 .getWriteShards( now );
+
+        batch.mergeShallow( shardedEdgeSerialization
+                .writeEdgeToTargetWithSourceType( edgeColumnFamilies, scope, markedEdge, targetSourceTypeWriteShards,
+                        targetSourceTypeEdgeMeta, timestamp ) );
+
+
+        /**
+         * Version write
+         */
+
+        final DirectedEdgeMeta edgeVersionsMeta = DirectedEdgeMeta.fromEdge( sourceNode, targetNode, edgeType );
+
+        final Collection<Shard> edgeVersionsShards =
+                edgeShardStrategy.getWriteShards( scope, edgeTimestamp, edgeVersionsMeta ).getWriteShards( now );
+
+        batch.mergeShallow( shardedEdgeSerialization
+                .writeEdgeVersions( edgeColumnFamilies, scope, markedEdge, edgeVersionsShards,
+                        edgeVersionsMeta, timestamp ) );
+
+
+        return batch;
     }
 
 
     @Override
     public MutationBatch deleteEdge( final ApplicationScope scope, final MarkedEdge markedEdge, final UUID timestamp ) {
-        return shardedEdgeSerialization.deleteEdge( edgeColumnFamilies, scope, markedEdge, timestamp );
+        ValidationUtils.validateApplicationScope( scope );
+        GraphValidation.validateEdge( markedEdge );
+        ValidationUtils.verifyTimeUuid( timestamp, "timestamp" );
+
+        final long now = timeService.getCurrentTime();
+        final Id sourceNode = markedEdge.getSourceNode();
+        final Id targetNode = markedEdge.getTargetNode();
+        final String edgeType = markedEdge.getType();
+        final long edgeTimestamp = markedEdge.getTimestamp();
+
+        /**
+         * Source write
+         */
+        final DirectedEdgeMeta sourceEdgeMeta = DirectedEdgeMeta.fromSourceNode( sourceNode, edgeType );
+
+        final Collection<Shard> sourceWriteShards =
+                edgeShardStrategy.getWriteShards( scope, edgeTimestamp, sourceEdgeMeta ).getWriteShards( now );
+
+        final MutationBatch batch = shardedEdgeSerialization
+                .deleteEdgeFromSource( edgeColumnFamilies, scope, markedEdge, sourceWriteShards, sourceEdgeMeta,
+                        timestamp );
+
+
+        /**
+         * Source with target  type write
+         */
+        final DirectedEdgeMeta sourceTargetTypeEdgeMeta =
+                DirectedEdgeMeta.fromSourceNodeTargetType( sourceNode, edgeType, targetNode.getType() );
+
+        final Collection<Shard> sourceTargetTypeWriteShards =
+                edgeShardStrategy.getWriteShards( scope, edgeTimestamp, sourceTargetTypeEdgeMeta )
+                                 .getWriteShards( now );
+
+        batch.mergeShallow( shardedEdgeSerialization
+                .deleteEdgeFromSourceWithTargetType( edgeColumnFamilies, scope, markedEdge, sourceTargetTypeWriteShards,
+                        sourceTargetTypeEdgeMeta, timestamp ) );
+
+
+        /**
+         * Target write
+         *
+         */
+
+        final DirectedEdgeMeta targetEdgeMeta = DirectedEdgeMeta.fromTargetNode( targetNode, edgeType );
+
+        final Collection<Shard> targetWriteShards =
+                edgeShardStrategy.getWriteShards( scope, edgeTimestamp, targetEdgeMeta ).getWriteShards( now );
+
+        batch.mergeShallow( shardedEdgeSerialization
+                .deleteEdgeToTarget( edgeColumnFamilies, scope, markedEdge, targetWriteShards, targetEdgeMeta,
+                        timestamp ) );
+
+
+        /**
+         * Target with source type write
+         */
+
+        final DirectedEdgeMeta targetSourceTypeEdgeMeta =
+                DirectedEdgeMeta.fromTargetNodeSourceType( targetNode, edgeType, sourceNode.getType() );
+
+        final Collection<Shard> targetSourceTypeWriteShards =
+                edgeShardStrategy.getWriteShards( scope, edgeTimestamp, targetSourceTypeEdgeMeta )
+                                 .getWriteShards( now );
+
+        batch.mergeShallow( shardedEdgeSerialization
+                .deleteEdgeToTargetWithSourceType( edgeColumnFamilies, scope, markedEdge, targetSourceTypeWriteShards,
+                        targetSourceTypeEdgeMeta, timestamp ) );
+
+
+        /**
+         * Version write
+         */
+
+        final DirectedEdgeMeta edgeVersionsMeta = DirectedEdgeMeta.fromEdge( sourceNode, targetNode, edgeType );
+
+        final Collection<Shard> edgeVersionsShards =
+                edgeShardStrategy.getWriteShards( scope, edgeTimestamp, edgeVersionsMeta ).getWriteShards( now );
+
+        batch.mergeShallow( shardedEdgeSerialization
+                .deleteEdgeVersions( edgeColumnFamilies, scope, markedEdge, edgeVersionsShards,
+                        edgeVersionsMeta, timestamp ) );
+
+
+        return batch;
     }
 
 
@@ -111,9 +288,17 @@ public class EdgeSerializationImpl implements EdgeSerialization  {
 
 
         final Iterator<ShardEntryGroup> readShards =
-                edgeShardStrategy.getReadShards(scope, maxTimestamp, versionMetaData );
+                edgeShardStrategy.getReadShards( scope, maxTimestamp, versionMetaData );
+
+
+        //now create a result iterator with our iterator of read shards
 
-        return shardedEdgeSerialization.getEdgeVersions( edgeColumnFamilies, scope, search, readShards );
+        return new ShardGroupColumnIterator<MarkedEdge>( readShards ) {
+            @Override
+            protected Iterator<MarkedEdge> getIterator( final Collection<Shard> readShards ) {
+                return shardedEdgeSerialization.getEdgeVersions( edgeColumnFamilies, scope, search, readShards );
+            }
+        };
     }
 
 
@@ -129,13 +314,17 @@ public class EdgeSerializationImpl implements EdgeSerialization  {
         final long maxTimestamp = edgeType.getMaxTimestamp();
 
 
-        final DirectedEdgeMeta sourceMeta = DirectedEdgeMeta.fromSourceNode( sourceId, type );
+        final DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromSourceNode( sourceId, type );
 
 
-        final Iterator<ShardEntryGroup> readShards =
-                edgeShardStrategy.getReadShards(scope, maxTimestamp, sourceMeta );
+        final Iterator<ShardEntryGroup> readShards = edgeShardStrategy.getReadShards( scope, maxTimestamp, directedEdgeMeta );
 
-        return shardedEdgeSerialization.getEdgesFromSource( edgeColumnFamilies, scope, edgeType, readShards );
+        return new ShardGroupColumnIterator<MarkedEdge>( readShards ) {
+            @Override
+            protected Iterator<MarkedEdge> getIterator( final Collection<Shard> readShards ) {
+                return shardedEdgeSerialization.getEdgesFromSource( edgeColumnFamilies, scope, edgeType, readShards );
+            }
+        };
     }
 
 
@@ -151,14 +340,19 @@ public class EdgeSerializationImpl implements EdgeSerialization  {
         final String targetType = edgeType.getIdType();
         final long maxTimestamp = edgeType.getMaxTimestamp();
 
-        final DirectedEdgeMeta sourceMeta = DirectedEdgeMeta.fromSourceNodeTargetType( sourceId, type, targetType );
+        final DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromSourceNodeTargetType( sourceId, type, targetType );
 
 
-        final Iterator<ShardEntryGroup> readShards =
-                edgeShardStrategy.getReadShards(scope, maxTimestamp, sourceMeta );
+        final Iterator<ShardEntryGroup> readShards = edgeShardStrategy.getReadShards( scope, maxTimestamp, directedEdgeMeta );
 
 
-        return shardedEdgeSerialization.getEdgesFromSourceByTargetType( edgeColumnFamilies, scope, edgeType, readShards );
+        return new ShardGroupColumnIterator<MarkedEdge>( readShards ) {
+            @Override
+            protected Iterator<MarkedEdge> getIterator( final Collection<Shard> readShards ) {
+                return shardedEdgeSerialization
+                        .getEdgesFromSourceByTargetType( edgeColumnFamilies, scope, edgeType, readShards );
+            }
+        };
     }
 
 
@@ -173,15 +367,17 @@ public class EdgeSerializationImpl implements EdgeSerialization  {
         final long maxTimestamp = edgeType.getMaxTimestamp();
 
 
+        final DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromTargetNode( targetId, type );
 
-        final DirectedEdgeMeta sourceMeta = DirectedEdgeMeta.fromTargetNode( targetId, type );
 
+        final Iterator<ShardEntryGroup> readShards = edgeShardStrategy.getReadShards( scope, maxTimestamp, directedEdgeMeta );
 
-        final Iterator<ShardEntryGroup> readShards =
-                edgeShardStrategy.getReadShards(scope, maxTimestamp, sourceMeta );
-
-
-        return shardedEdgeSerialization.getEdgesToTarget( edgeColumnFamilies, scope, edgeType, readShards );
+        return new ShardGroupColumnIterator<MarkedEdge>( readShards ) {
+            @Override
+            protected Iterator<MarkedEdge> getIterator( final Collection<Shard> readShards ) {
+                return shardedEdgeSerialization.getEdgesToTarget( edgeColumnFamilies, scope, edgeType, readShards );
+            }
+        };
     }
 
 
@@ -198,14 +394,18 @@ public class EdgeSerializationImpl implements EdgeSerialization  {
         final long maxTimestamp = edgeType.getMaxTimestamp();
 
 
-        final DirectedEdgeMeta sourceMeta = DirectedEdgeMeta.fromSourceNodeTargetType( targetId, type, sourceType );
+        final DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromTargetNodeSourceType( targetId, type, sourceType );
 
 
-        final Iterator<ShardEntryGroup> readShards =
-                edgeShardStrategy.getReadShards(scope, maxTimestamp, sourceMeta );
+        final Iterator<ShardEntryGroup> readShards = edgeShardStrategy.getReadShards( scope, maxTimestamp, directedEdgeMeta );
 
 
-        return shardedEdgeSerialization.getEdgesToTargetBySourceType( edgeColumnFamilies, scope, edgeType, readShards );
+        return new ShardGroupColumnIterator<MarkedEdge>( readShards ) {
+            @Override
+            protected Iterator<MarkedEdge> getIterator( final Collection<Shard> readShards ) {
+                return shardedEdgeSerialization
+                        .getEdgesToTargetBySourceType( edgeColumnFamilies, scope, edgeType, readShards );
+            }
+        };
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c69c1974/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/DirectedEdgeMeta.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/DirectedEdgeMeta.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/DirectedEdgeMeta.java
index 89e46d9..92f2548 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/DirectedEdgeMeta.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/DirectedEdgeMeta.java
@@ -23,6 +23,7 @@ package org.apache.usergrid.persistence.graph.serialization.impl.shard;
 
 
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -87,6 +88,36 @@ public abstract class DirectedEdgeMeta {
         public NodeType getNodeType() {
             return nodeType;
         }
+
+
+        @Override
+        public boolean equals( final Object o ) {
+            if ( this == o ) {
+                return true;
+            }
+            if ( !( o instanceof NodeMeta ) ) {
+                return false;
+            }
+
+            final NodeMeta nodeMeta = ( NodeMeta ) o;
+
+            if ( !id.equals( nodeMeta.id ) ) {
+                return false;
+            }
+            if ( nodeType != nodeMeta.nodeType ) {
+                return false;
+            }
+
+            return true;
+        }
+
+
+        @Override
+        public int hashCode() {
+            int result = id.hashCode();
+            result = 31 * result + nodeType.hashCode();
+            return result;
+        }
     }
 
 
@@ -125,9 +156,11 @@ public abstract class DirectedEdgeMeta {
      */
     public abstract Iterator<MarkedEdge> loadEdges( final ShardedEdgeSerialization serialization,
                                                     final EdgeColumnFamilies edgeColumnFamilies,
-                                                    final ApplicationScope scope, final ShardEntryGroup group,
+                                                    final ApplicationScope scope, final Collection<Shard> shards,
                                                     final long maxValue );
 
+
+
     /**
      * Get the type of this directed edge
      */
@@ -192,7 +225,7 @@ public abstract class DirectedEdgeMeta {
             @Override
             public Iterator<MarkedEdge> loadEdges( final ShardedEdgeSerialization serialization,
                                                    final EdgeColumnFamilies edgeColumnFamilies,
-                                                   final ApplicationScope scope, final ShardEntryGroup group,
+                                                   final ApplicationScope scope, final Collection<Shard>  shards,
                                                    final long maxValue ) {
 
                 final Id sourceId = nodes[0].id;
@@ -200,8 +233,7 @@ public abstract class DirectedEdgeMeta {
 
                 final SearchByEdgeType search = new SimpleSearchByEdgeType( sourceId, edgeType, maxValue, null );
 
-                return serialization.getEdgesFromSource( edgeColumnFamilies, scope, search,
-                        Collections.singleton( group ).iterator() );
+                return serialization.getEdgesFromSource( edgeColumnFamilies, scope, search, shards );
             }
 
 
@@ -233,9 +265,9 @@ public abstract class DirectedEdgeMeta {
             @Override
             public Iterator<MarkedEdge> loadEdges( final ShardedEdgeSerialization serialization,
                                                    final EdgeColumnFamilies edgeColumnFamilies,
-                                                   final ApplicationScope scope, final ShardEntryGroup group,
+                                                   final ApplicationScope scope, final Collection<Shard>shards,
                                                    final long maxValue ) {
-
+//
                 final Id sourceId = nodes[0].id;
                 final String edgeType = types[0];
                 final String targetType = types[1];
@@ -243,8 +275,8 @@ public abstract class DirectedEdgeMeta {
                 final SearchByIdType search =
                         new SimpleSearchByIdType( sourceId, edgeType, maxValue, targetType, null );
 
-                return serialization.getEdgesFromSourceByTargetType( edgeColumnFamilies, scope, search,
-                        Collections.singleton( group ).iterator() );
+                return serialization.getEdgesFromSourceByTargetType( edgeColumnFamilies, scope, search, shards);
+
             }
 
 
@@ -272,7 +304,7 @@ public abstract class DirectedEdgeMeta {
             @Override
             public Iterator<MarkedEdge> loadEdges( final ShardedEdgeSerialization serialization,
                                                    final EdgeColumnFamilies edgeColumnFamilies,
-                                                   final ApplicationScope scope, final ShardEntryGroup group,
+                                                   final ApplicationScope scope, final Collection<Shard>  shards,
                                                    final long maxValue ) {
 
 
@@ -281,8 +313,7 @@ public abstract class DirectedEdgeMeta {
 
                 final SearchByEdgeType search = new SimpleSearchByEdgeType( targetId, edgeType, maxValue, null );
 
-                return serialization.getEdgesToTarget( edgeColumnFamilies, scope, search,
-                        Collections.singleton( group ).iterator() );
+                return serialization.getEdgesToTarget( edgeColumnFamilies, scope, search, shards );
             }
 
 
@@ -311,7 +342,7 @@ public abstract class DirectedEdgeMeta {
             @Override
             public Iterator<MarkedEdge> loadEdges( final ShardedEdgeSerialization serialization,
                                                    final EdgeColumnFamilies edgeColumnFamilies,
-                                                   final ApplicationScope scope, final ShardEntryGroup group,
+                                                   final ApplicationScope scope, final Collection<Shard> shards,
                                                    final long maxValue ) {
 
                 final Id targetId = nodes[0].id;
@@ -322,8 +353,7 @@ public abstract class DirectedEdgeMeta {
                 final SearchByIdType search =
                         new SimpleSearchByIdType( targetId, edgeType, maxValue, sourceType, null );
 
-                return serialization.getEdgesToTargetBySourceType( edgeColumnFamilies, scope, search,
-                        Collections.singleton( group ).iterator() );
+                return serialization.getEdgesToTargetBySourceType( edgeColumnFamilies, scope, search, shards);
             }
 
 
@@ -355,7 +385,7 @@ public abstract class DirectedEdgeMeta {
             @Override
             public Iterator<MarkedEdge> loadEdges( final ShardedEdgeSerialization serialization,
                                                    final EdgeColumnFamilies edgeColumnFamilies,
-                                                   final ApplicationScope scope, final ShardEntryGroup group,
+                                                   final ApplicationScope scope, final Collection<Shard>  shards,
                                                    final long maxValue ) {
 
                 final Id sourceId = nodes[0].id;
@@ -365,8 +395,8 @@ public abstract class DirectedEdgeMeta {
                 final SimpleSearchByEdge search =
                         new SimpleSearchByEdge( sourceId, edgeType, targetId, maxValue, null );
 
-                return serialization.getEdgeVersions( edgeColumnFamilies, scope, search,
-                        Collections.singleton( group ).iterator() );
+                return serialization.getEdgeVersions( edgeColumnFamilies, scope, search, shards);
+
             }
 
 
@@ -378,6 +408,7 @@ public abstract class DirectedEdgeMeta {
     }
 
 
+
     /**
      * Create a directed edge from the stored meta data
      * @param metaType The meta type stored

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c69c1974/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
index 59ea9fb..70569fd 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
@@ -256,13 +256,13 @@ public class ShardEntryGroup {
     }
 
 
-    /**
-     * Helper method to create a shard entry group with a single shard
-     */
-    public static ShardEntryGroup singletonGroup( final Shard shard, final long delta ) {
-        ShardEntryGroup group = new ShardEntryGroup( delta );
-        group.addShard( shard );
-
-        return group;
+    @Override
+    public String toString() {
+        return "ShardEntryGroup{" +
+                "shards=" + shards +
+                ", delta=" + delta +
+                ", maxCreatedTime=" + maxCreatedTime +
+                ", compactionTarget=" + compactionTarget +
+                '}';
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c69c1974/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
index 13c5596..bf4d3c9 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
@@ -22,6 +22,10 @@
 package org.apache.usergrid.persistence.graph.serialization.impl.shard;
 
 
+import java.util.Set;
+
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+
 import rx.Observable;
 
 
@@ -36,8 +40,39 @@ public interface ShardGroupCompaction {
     /**
      * Execute the compaction task.  Will return the number of edges that have
      * @param group The shard entry group to compact
-     * @return The number of edges that are now compacted into the target shard
+     * @return The shards that were compacted
+     */
+    public Set<Shard> compact(final ApplicationScope scope, final DirectedEdgeMeta edgeMeta, final ShardEntryGroup group);
+
+    /**
+     * Possibly audit the shard entry group.  This is asynchronous and returns immediately
+     * @param group
+     * @return
      */
-    public Observable<Integer> compact(ShardEntryGroup group);
+    public AuditResult evaluateShardGroup( final ApplicationScope scope, final DirectedEdgeMeta edgeMeta,
+                                           final ShardEntryGroup group );
+
+
+    public enum AuditResult{
+        /**
+         * We didn't check this shard
+         */
+        NOT_CHECKED,
+        /**
+         * This shard was checked, but nothing was allocated
+         */
+        CHECKED_NO_OP,
+
+        /**
+         * We checked and created a new shard
+         */
+        CHECKED_CREATED,
+
+        /**
+         * The shard group is already compacting
+         */
+        COMPACTING
+    }
+
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c69c1974/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardedEdgeSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardedEdgeSerialization.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardedEdgeSerialization.java
index d1ad18d..51fdf0c 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardedEdgeSerialization.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardedEdgeSerialization.java
@@ -19,7 +19,9 @@
 package org.apache.usergrid.persistence.graph.serialization.impl.shard;
 
 
+import java.util.Collection;
 import java.util.Iterator;
+import java.util.Set;
 import java.util.UUID;
 
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
@@ -44,19 +46,84 @@ public interface ShardedEdgeSerialization {
      * @param markedEdge The edge to write
      * @param timestamp The timestamp to use
      */
-    MutationBatch writeEdge( EdgeColumnFamilies columnFamilies, ApplicationScope scope, MarkedEdge markedEdge,
-                             UUID timestamp );
+    MutationBatch writeEdgeFromSource( EdgeColumnFamilies columnFamilies, ApplicationScope scope, MarkedEdge markedEdge,
+                                       Collection<Shard> shards,  DirectedEdgeMeta sourceEdgeMeta,  UUID timestamp );
+
+
+    /**
+     * Write the edge from source->target
+     */
+    MutationBatch writeEdgeFromSourceWithTargetType( EdgeColumnFamilies columnFamilies, ApplicationScope scope,
+                                                     MarkedEdge markedEdge, Collection<Shard> shards,  DirectedEdgeMeta sourceEdgeMeta, UUID timestamp );
+
+    /**
+     * Write the edge from target to source
+     */
+    MutationBatch writeEdgeToTarget( EdgeColumnFamilies columnFamilies, ApplicationScope scope, MarkedEdge markedEdge,
+                                     Collection<Shard> shards,  DirectedEdgeMeta sourceEdgeMeta, UUID timestamp );
+
+
+    /**
+     * Write the edge from target to source with source type
+     */
+    MutationBatch writeEdgeToTargetWithSourceType( EdgeColumnFamilies columnFamilies, ApplicationScope scope,
+                                                   MarkedEdge markedEdge, Collection<Shard> shards, DirectedEdgeMeta sourceEdgeMeta,  UUID timestamp );
+
+
+    /**
+        * EdgeWrite both the source--->Target edge and the target <----- source edge into the mutation
+        *
+        * @param columnFamilies The column families to use
+        * @param scope The org scope of the graph
+        * @param markedEdge The edge to write
+        * @param timestamp The timestamp to use
+        */
+       MutationBatch writeEdgeVersions( EdgeColumnFamilies columnFamilies, ApplicationScope scope, MarkedEdge markedEdge,
+                                          Collection<Shard> shards,  DirectedEdgeMeta sourceEdgeMeta,  UUID timestamp );
+
+
 
     /**
-     * EdgeWrite both the source -->target edge and the target<--- source edge into the mutation
+     * EdgeWrite both the source--->Target edge and the target <----- source edge into the mutation
      *
      * @param columnFamilies The column families to use
      * @param scope The org scope of the graph
      * @param markedEdge The edge to write
-     * @param timestamp The timestamp of the uuid
+     * @param timestamp The timestamp to use
+     */
+    MutationBatch deleteEdgeFromSource( EdgeColumnFamilies columnFamilies, ApplicationScope scope,
+                                        MarkedEdge markedEdge, Collection<Shard> shards,  DirectedEdgeMeta sourceEdgeMeta, UUID timestamp );
+
+
+    /**
+     * Write the edge from source->target
+     */
+    MutationBatch deleteEdgeFromSourceWithTargetType( EdgeColumnFamilies columnFamilies, ApplicationScope scope,
+                                                      MarkedEdge markedEdge, Collection<Shard> shards,  DirectedEdgeMeta sourceEdgeMeta,  UUID timestamp );
+
+    /**
+     * Write the edge from target to source
+     */
+    MutationBatch deleteEdgeToTarget( EdgeColumnFamilies columnFamilies, ApplicationScope scope, MarkedEdge markedEdge,
+                                      Collection<Shard> shards, DirectedEdgeMeta sourceEdgeMeta,  UUID timestamp );
+
+
+    /**
+     * Write the edge from target to source with source type
      */
-    MutationBatch deleteEdge( EdgeColumnFamilies columnFamilies, ApplicationScope scope, MarkedEdge markedEdge,
-                              UUID timestamp );
+    MutationBatch deleteEdgeToTargetWithSourceType( EdgeColumnFamilies columnFamilies, ApplicationScope scope,
+                                                    MarkedEdge markedEdge, Collection<Shard> shards,  DirectedEdgeMeta sourceEdgeMeta, UUID timestamp );
+
+    /**
+            * EdgeWrite both the source--->Target edge and the target <----- source edge into the mutation
+            *
+            * @param columnFamilies The column families to use
+            * @param scope The org scope of the graph
+            * @param markedEdge The edge to write
+            * @param timestamp The timestamp to use
+            */
+           MutationBatch deleteEdgeVersions( EdgeColumnFamilies columnFamilies, ApplicationScope scope, MarkedEdge markedEdge,
+                                              Collection<Shard> shards,  DirectedEdgeMeta sourceEdgeMeta,  UUID timestamp );
 
 
     /**
@@ -65,10 +132,10 @@ public interface ShardedEdgeSerialization {
      * @param columnFamilies The column families to use
      * @param scope The application scope
      * @param search The search criteria
-     * @param shards The shards to iterate when searching
+     * @param shards The shards multiget when reading
      */
     Iterator<MarkedEdge> getEdgeVersions( EdgeColumnFamilies columnFamilies, ApplicationScope scope,
-                                          SearchByEdge search, Iterator<ShardEntryGroup> shards );
+                                          SearchByEdge search, Collection<Shard> shards );
 
     /**
      * Get an iterator of all edges by edge type originating from source node
@@ -79,7 +146,7 @@ public interface ShardedEdgeSerialization {
      * @param shards The shards to iterate when searching
      */
     Iterator<MarkedEdge> getEdgesFromSource( EdgeColumnFamilies columnFamilies, ApplicationScope scope,
-                                             SearchByEdgeType search, Iterator<ShardEntryGroup> shards );
+                                             SearchByEdgeType search, Collection<Shard> shards );
 
 
     /**
@@ -91,7 +158,7 @@ public interface ShardedEdgeSerialization {
      * @param shards The shards to iterate when searching
      */
     Iterator<MarkedEdge> getEdgesFromSourceByTargetType( EdgeColumnFamilies columnFamilies, ApplicationScope scope,
-                                                         SearchByIdType search, Iterator<ShardEntryGroup> shards );
+                                                         SearchByIdType search, Collection<Shard> shards );
 
     /**
      * Get an iterator of all edges by edge type pointing to the target node.  Returns all versions
@@ -102,7 +169,7 @@ public interface ShardedEdgeSerialization {
      * @param shards The shards to iterate when searching
      */
     Iterator<MarkedEdge> getEdgesToTarget( EdgeColumnFamilies columnFamilies, ApplicationScope scope,
-                                           SearchByEdgeType search, Iterator<ShardEntryGroup> shards );
+                                           SearchByEdgeType search, Collection<Shard> shards );
 
 
     /**
@@ -115,5 +182,5 @@ public interface ShardedEdgeSerialization {
      * @param shards The shards to iterate when searching
      */
     Iterator<MarkedEdge> getEdgesToTargetBySourceType( EdgeColumnFamilies columnFamilies, ApplicationScope scope,
-                                                       SearchByIdType search, Iterator<ShardEntryGroup> shards );
+                                                       SearchByIdType search, Collection<Shard> shards );
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c69c1974/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java
index d3d92ea..ed3daaf 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java
@@ -127,7 +127,7 @@ public class NodeShardCounterSerializationImpl implements NodeShardCounterSerial
         }
         //column not found, return 0
         catch ( RuntimeException re ) {
-            if(re.getCause() instanceof NotFoundException) {
+            if(re.getCause().getCause() instanceof NotFoundException) {
                 return 0;
             }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c69c1974/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
index b7cc25d..27862d0 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
@@ -6,6 +6,7 @@ import java.util.Collection;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.usergrid.persistence.core.astyanax.ColumnParser;
 import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
@@ -29,19 +30,18 @@ import com.netflix.astyanax.util.RangeBuilder;
  * @param <C> The column type
  * @param <T> The parsed return type
  */
-public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Comparator<T>,
-        Iterator<List<ScopedRowKey<ApplicationScope, R>>> {
+public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Comparator<T> {
 
     protected final Optional<Edge> last;
     protected final long maxTimestamp;
     protected final ApplicationScope scope;
-    protected final Iterator<ShardEntryGroup> shards;
+    protected final Collection<Shard> shards;
 
 
     protected EdgeSearcher( final ApplicationScope scope, final long maxTimestamp, final Optional<Edge> last,
-                            final Iterator<ShardEntryGroup> shards ) {
+                            final Collection<Shard> shards ) {
 
-        Preconditions.checkArgument(shards.hasNext(), "Cannot search with no possible shards");
+        Preconditions.checkArgument(shards.size() > 0 , "Cannot search with no possible shards");
 
         this.scope = scope;
         this.maxTimestamp = maxTimestamp;
@@ -50,19 +50,12 @@ public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Compa
     }
 
 
-    @Override
-    public boolean hasNext() {
-        return shards.hasNext();
-    }
 
+    public List<ScopedRowKey<ApplicationScope, R>> getRowKeys() {
 
-    @Override
-    public List<ScopedRowKey<ApplicationScope, R>> next() {
-        Collection<Shard> readShards = shards.next().getReadShards();
+        List<ScopedRowKey<ApplicationScope, R>> rowKeys = new ArrayList<>(shards.size());
 
-        List<ScopedRowKey<ApplicationScope, R>> rowKeys = new ArrayList<>(readShards.size());
-
-        for(Shard shard : readShards){
+        for(Shard shard : shards){
 
             final ScopedRowKey<ApplicationScope, R> rowKey = ScopedRowKey
                     .fromKey( scope, generateRowKey(shard.getShardIndex() ) );
@@ -75,11 +68,6 @@ public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Compa
     }
 
 
-    @Override
-    public void remove() {
-        throw new UnsupportedOperationException( "Remove is unsupported" );
-    }
-
 
     /**
      * Set the range on a search
@@ -93,10 +81,7 @@ public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Compa
 
             builder.setStart( sourceEdge, getSerializer() );
         }
-        else {
-
 
-        }
     }
 
 
@@ -113,6 +98,7 @@ public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Compa
     }
 
 
+
     /**
      * Get the column's serializer
      */

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c69c1974/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
index 6c79814..b919ad7 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
@@ -42,7 +42,9 @@ import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardedEdgeSerialization;
 import org.apache.usergrid.persistence.graph.serialization.util.GraphValidation;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 
+import com.fasterxml.uuid.impl.UUIDUtil;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
@@ -167,7 +169,7 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
          */
 
         final Iterator<MarkedEdge> edges = directedEdgeMeta
-                .loadEdges( shardedEdgeSerialization, edgeColumnFamilies, scope, shardEntryGroup, Long.MAX_VALUE );
+                .loadEdges( shardedEdgeSerialization, edgeColumnFamilies, scope, shardEntryGroup.getReadShards(), Long.MAX_VALUE );
 
 
         if ( !edges.hasNext() ) {
@@ -218,15 +220,18 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
      */
     private boolean isNewNode( DirectedEdgeMeta directedEdgeMeta ) {
 
-        /**
-         * the max time in microseconds we can allow
-         */
-        final long maxTime = ( timeService.getCurrentTime() + graphFig.getShardCacheTimeout() ) * 10000;
+        //The timeout is in milliseconds.  Time for a time uuid is 1/10000 of a milli, so we need to get the units correct
+        final long uuidDelta =  graphFig.getShardCacheTimeout()  * 10000;
+
+        final long timeNow = UUIDGenerator.newTimeUUID().timestamp();
 
         for ( DirectedEdgeMeta.NodeMeta node : directedEdgeMeta.getNodes() ) {
+
             final long uuidTime = node.getId().getUuid().timestamp();
 
-            if ( uuidTime < maxTime ) {
+            final long uuidTimeDelta = uuidTime + uuidDelta;
+
+            if ( uuidTimeDelta < timeNow ) {
                 return true;
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c69c1974/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
index d3dd86e..d444eec 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
@@ -22,10 +22,16 @@ package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
 import java.beans.PropertyChangeEvent;
 import java.beans.PropertyChangeListener;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.TreeMap;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 import org.slf4j.Logger;
@@ -49,11 +55,19 @@ import com.google.common.base.Preconditions;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
 import com.google.common.cache.Weigher;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListenableFutureTask;
+import com.google.common.util.concurrent.ListeningScheduledExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.google.inject.Inject;
 
+import rx.Observable;
+import rx.Subscriber;
+import rx.functions.Action1;
+import rx.functions.Func1;
+import rx.schedulers.Schedulers;
+
 
 /**
  * Simple implementation of the shard.  Uses a local Guava shard with a timeout.  If a value is not present in the
@@ -69,10 +83,14 @@ public class NodeShardCacheImpl implements NodeShardCache {
      */
     private static final int MAX_WEIGHT_PER_ELEMENT = 10000;
 
+
     private final NodeShardAllocation nodeShardAllocation;
     private final GraphFig graphFig;
     private final TimeService timeservice;
 
+
+
+    private ListeningScheduledExecutorService refreshExecutors;
     private LoadingCache<CacheKey, CacheEntry> graphs;
 
 
@@ -93,6 +111,7 @@ public class NodeShardCacheImpl implements NodeShardCache {
         this.graphFig = graphFig;
         this.timeservice = timeservice;
 
+
         /**
          * Add our listener to reconstruct the shard
          */
@@ -102,7 +121,9 @@ public class NodeShardCacheImpl implements NodeShardCache {
                 final String propertyName = evt.getPropertyName();
 
                 if ( propertyName.equals( GraphFig.SHARD_CACHE_SIZE ) || propertyName
-                        .equals( GraphFig.SHARD_CACHE_TIMEOUT ) ) {
+                        .equals( GraphFig.SHARD_CACHE_TIMEOUT ) || propertyName
+                        .equals( GraphFig.SHARD_CACHE_REFRESH_WORKERS ) ) {
+
 
                     updateCache();
                 }
@@ -120,7 +141,7 @@ public class NodeShardCacheImpl implements NodeShardCache {
     public ShardEntryGroup getWriteShardGroup( final ApplicationScope scope, final long timestamp,
                                                final DirectedEdgeMeta directedEdgeMeta ) {
 
-        ValidationUtils.validateApplicationScope(scope);
+        ValidationUtils.validateApplicationScope( scope );
         GraphValidation.validateDirectedEdgeMeta( directedEdgeMeta );
 
         final CacheKey key = new CacheKey( scope, directedEdgeMeta );
@@ -147,7 +168,7 @@ public class NodeShardCacheImpl implements NodeShardCache {
     @Override
     public Iterator<ShardEntryGroup> getReadShardGroup( final ApplicationScope scope, final long maxTimestamp,
                                                         final DirectedEdgeMeta directedEdgeMeta ) {
-        ValidationUtils.validateApplicationScope(scope);
+        ValidationUtils.validateApplicationScope( scope );
         GraphValidation.validateDirectedEdgeMeta( directedEdgeMeta );
 
         final CacheKey key = new CacheKey( scope, directedEdgeMeta );
@@ -175,11 +196,25 @@ public class NodeShardCacheImpl implements NodeShardCache {
      * doesn't have to be precise.  The algorithm accounts for stale data.
      */
     private void updateCache() {
+        if ( this.refreshExecutors != null ) {
+            this.refreshExecutors.shutdown();
+        }
+
+        this.refreshExecutors = MoreExecutors
+                .listeningDecorator( Executors.newScheduledThreadPool( graphFig.getShardCacheRefreshWorkerCount() ) );
+
+
+        this.graphs = CacheBuilder.newBuilder()
+
+                //we want to asynchronously load new values for existing ones, that way we wont' have to
+                //wait for a trip to cassandra
+                .refreshAfterWrite( graphFig.getShardCacheTimeout(), TimeUnit.MILLISECONDS )
+
+                        //set our weight function, since not all shards are equal
+                .maximumWeight(MAX_WEIGHT_PER_ELEMENT * graphFig.getShardCacheSize() ).weigher( new ShardWeigher() )
 
-        this.graphs = CacheBuilder.newBuilder().expireAfterWrite( graphFig.getShardCacheSize(), TimeUnit.MILLISECONDS )
-                                  .removalListener( new ShardRemovalListener() )
-                                  .maximumWeight( MAX_WEIGHT_PER_ELEMENT * graphFig.getShardCacheSize() )
-                                  .weigher( new ShardWeigher() ).build( new ShardCacheLoader() );
+                        //set our shard loader
+                .build( new ShardCacheLoader() );
     }
 
 
@@ -296,77 +331,43 @@ public class NodeShardCacheImpl implements NodeShardCache {
 
 
         @Override
-        public CacheEntry load( final CacheKey key ) throws Exception {
+        public CacheEntry load( final CacheKey key ) {
 
 
             final Iterator<ShardEntryGroup> edges =
                     nodeShardAllocation.getShards( key.scope, Optional.<Shard>absent(), key.directedEdgeMeta );
 
-            return new CacheEntry( edges );
+            final CacheEntry cacheEntry = new CacheEntry( edges );
+
+            return cacheEntry;
         }
-    }
 
 
-    /**
-     * Calculates the weight of the entry by geting the size of the cache
-     */
-    final class ShardWeigher implements Weigher<CacheKey, CacheEntry> {
-
         @Override
-        public int weigh( final CacheKey key, final CacheEntry value ) {
-            return value.getCacheSize();
+        public ListenableFuture<CacheEntry> reload( final CacheKey key, final CacheEntry oldValue ) throws Exception {
+            ListenableFutureTask<CacheEntry> task = ListenableFutureTask.create( new Callable<CacheEntry>() {
+                public CacheEntry call() {
+                    return load( key );
+                }
+            } );
+            //load via the refresh executor
+            refreshExecutors.execute( task );
+            return task;
         }
+
+        //TODO, use RX for sliding window buffering and duplicate removal
     }
 
 
+
     /**
-     * On removal from the cache, we want to audit the maximum shard.  If it needs to allocate a new shard, we want to
-     * do so. IF there's a compaction pending, we want to run the compaction task
+     * Calculates the weight of the entry by geting the size of the cache
      */
-    final class ShardRemovalListener implements RemovalListener<CacheKey, CacheEntry> {
+    final class ShardWeigher implements Weigher<CacheKey, CacheEntry> {
 
         @Override
-        public void onRemoval( final RemovalNotification<CacheKey, CacheEntry> notification ) {
-
-
-            final CacheKey key = notification.getKey();
-            final CacheEntry entry = notification.getValue();
-
-
-            Iterator<ShardEntryGroup> groups = entry.getShards( Long.MAX_VALUE );
-
-
-            /**
-             * Start at our max, then
-             */
-
-            //audit all our groups
-            while ( groups.hasNext() ) {
-                ShardEntryGroup group = groups.next();
-
-                /**
-                 * We don't have a compaction pending.  Run an audit on the shards
-                 */
-                if ( !group.isCompactionPending() ) {
-                    LOG.debug( "No compaction pending, checking max shard on expiration" );
-                    /**
-                     * Check if we should allocate, we may want to
-                     */
-
-
-                    nodeShardAllocation.auditShard( key.scope, group, key.directedEdgeMeta );
-                    continue;
-                }
-                /**
-                 * Do the compaction
-                 */
-                if ( group.shouldCompact( timeservice.getCurrentTime() ) ) {
-                    //launch the compaction
-                }
-
-                //no op, there's nothing we need to do to this shard
-
-            }
+        public int weigh( final CacheKey key, final CacheEntry value ) {
+            return value.getCacheSize();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c69c1974/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupColumnIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupColumnIterator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupColumnIterator.java
new file mode 100644
index 0000000..8779b96
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupColumnIterator.java
@@ -0,0 +1,130 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
+
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.usergrid.persistence.core.astyanax.ColumnNameIterator;
+import org.apache.usergrid.persistence.core.astyanax.MultiKeyColumnNameIterator;
+import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
+import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
+
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.model.ConsistencyLevel;
+import com.netflix.astyanax.query.RowQuery;
+import com.netflix.astyanax.util.RangeBuilder;
+
+
+/**
+ *
+ * Iterator to keep iterating over multiple shard groups to stream results
+ *
+ * @param <T> The parsed return type
+ */
+public abstract class ShardGroupColumnIterator<T> implements Iterator<T> {
+
+
+    private final Iterator<ShardEntryGroup> entryGroupIterator;
+    private Iterator<T> elements;
+
+
+    public ShardGroupColumnIterator( final Iterator<ShardEntryGroup> entryGroupIterator ){
+        this.entryGroupIterator = entryGroupIterator;
+    }
+
+
+    @Override
+    public boolean hasNext() {
+
+
+        if(elements == null){
+            return advance();
+        }
+
+        if(elements.hasNext()){
+            return true;
+        }
+
+        //we've exhausted our shard groups and we don't have a next, we can't continue
+        if(!entryGroupIterator.hasNext()){
+            return false;
+        }
+
+
+        return advance();
+    }
+
+
+    @Override
+    public T next() {
+        if ( !hasNext() ) {
+            throw new NoSuchElementException( "There are no more rows or columns left to advance" );
+        }
+
+        return elements.next();
+    }
+
+
+    @Override
+    public void remove() {
+        throw new UnsupportedOperationException( "Remove is unsupported" );
+    }
+
+
+    /**
+     * Get an iterator for the shard entry group
+     * @param readShards the read shards to use
+     * @return
+     */
+    protected abstract Iterator<T> getIterator(Collection<Shard> readShards);
+
+
+    public boolean advance(){
+
+        while(entryGroupIterator.hasNext()){
+
+            final ShardEntryGroup group = entryGroupIterator.next();
+
+            elements = getIterator( group.getReadShards() );
+
+            /**
+             * We're done, we have some columns to return
+             */
+            if(elements.hasNext()){
+                return true;
+            }
+
+        }
+
+
+        return false;
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c69c1974/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
index 48336cd..c566d43 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
@@ -22,18 +22,45 @@
 package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
 
 
+import java.nio.charset.Charset;
+import java.util.BitSet;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
 
 import org.apache.usergrid.persistence.core.consistency.TimeService;
+import org.apache.usergrid.persistence.core.rx.ObservableIterator;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.GraphFig;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeColumnFamilies;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardSerialization;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardAllocation;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardGroupCompaction;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardedEdgeSerialization;
+import org.apache.usergrid.persistence.model.entity.Id;
 
 import com.google.common.base.Preconditions;
+import com.google.common.hash.HashCode;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 
 import rx.Observable;
+import rx.functions.Action0;
+import rx.functions.Action1;
+import rx.functions.Func1;
+import rx.schedulers.Schedulers;
 
 
 /**
@@ -43,20 +70,56 @@ import rx.Observable;
 public class ShardGroupCompactionImpl implements ShardGroupCompaction {
 
 
+    private static final Charset CHARSET = Charset.forName( "UTF-8" );
+
+    private static final HashFunction MURMUR_128 = Hashing.murmur3_128();
+
     private final TimeService timeService;
+    private final GraphFig graphFig;
+    private final NodeShardAllocation nodeShardAllocation;
+    private final ShardedEdgeSerialization shardedEdgeSerialization;
+    private final EdgeColumnFamilies edgeColumnFamilies;
+    private final EdgeShardSerialization edgeShardSerialization;
+
+
+    private final Random random;
+    private final ShardCompactionTaskTracker shardCompactionTaskTracker;
 
 
     @Inject
-    public ShardGroupCompactionImpl( final TimeService timeService ) {this.timeService = timeService;}
+    public ShardGroupCompactionImpl( final TimeService timeService, final GraphFig graphFig,
+                                     final NodeShardAllocation nodeShardAllocation,
+                                     final ShardedEdgeSerialization shardedEdgeSerialization,
+                                     final EdgeColumnFamilies edgeColumnFamilies,
+                                     final EdgeShardSerialization edgeShardSerialization ) {
+
+        this.timeService = timeService;
+        this.graphFig = graphFig;
+        this.nodeShardAllocation = nodeShardAllocation;
+        this.shardedEdgeSerialization = shardedEdgeSerialization;
+        this.edgeColumnFamilies = edgeColumnFamilies;
+        this.edgeShardSerialization = edgeShardSerialization;
+
+        this.random = new Random();
+        this.shardCompactionTaskTracker = new ShardCompactionTaskTracker();
+    }
 
 
     @Override
-    public Observable<Integer> compact( final ShardEntryGroup group ) {
+    public Set<Shard> compact( final ApplicationScope scope, final DirectedEdgeMeta edgeMeta,
+                               final ShardEntryGroup group ) {
         final long startTime = timeService.getCurrentTime();
 
-        Preconditions.checkNotNull(group, "group cannot be null");
+        Preconditions.checkNotNull( group, "group cannot be null" );
         Preconditions.checkArgument( group.isCompactionPending(), "Compaction is pending" );
-        Preconditions.checkArgument( group.shouldCompact(startTime  ), "Compaction can now be run" );
+        Preconditions.checkArgument( group.shouldCompact( startTime ), "Compaction can now be run" );
+
+        /**
+         * It's already compacting, don't do anything
+         */
+        if (!shardCompactionTaskTracker.shouldStartCompaction( scope, edgeMeta, group )){
+            return Collections.emptySet();
+        }
 
 
         final Shard targetShard = group.getCompactionTarget();
@@ -64,12 +127,181 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
         final Collection<Shard> sourceShards = group.getReadShards();
 
 
-        //now get iterators for each of the source shards, and then copy them to the compaction target shard
+
+        Observable.create( new ObservableIterator<MarkedEdge>( "Shard_Repair" ) {
+            @Override
+            protected Iterator<MarkedEdge> getIterator() {
+                return edgeMeta.loadEdges( shardedEdgeSerialization, edgeColumnFamilies, scope, group.getReadShards(), Long.MAX_VALUE );
+            }
+        } ).buffer( graphFig.getScanPageSize() ).doOnNext( new Action1<List<MarkedEdge>>() {
+            @Override
+            public void call( final List<MarkedEdge> markedEdges ) {
+
+            }
+        }).doOnNext( new Action1<List<MarkedEdge>>() {
+            @Override
+            public void call( final List<MarkedEdge> markedEdges ) {
+
+            }
+        } );
+
 
 
 
 
         return null;
+    }
+
+
+    @Override
+    public AuditResult evaluateShardGroup( final ApplicationScope scope, final DirectedEdgeMeta edgeMeta,
+                                           final ShardEntryGroup group ) {
+
+
+        final double repairChance = random.nextDouble();
+
+        //don't repair
+        if ( repairChance > graphFig.getShardRepairChance() ) {
+            return AuditResult.NOT_CHECKED;
+        }
+
+
+        /**
+         * We don't have a compaction pending.  Run an audit on the shards
+         */
+        if ( !group.isCompactionPending() ) {
+
+            /**
+             * Check if we should allocate, we may want to
+             */
+
+
+            final boolean created = nodeShardAllocation.auditShard( scope, group, edgeMeta );
+
+
+            if ( !created ) {
+                return AuditResult.CHECKED_NO_OP;
+            }
+
+
+            return AuditResult.CHECKED_CREATED;
+        }
+
+        //check our taskmanager
+
+
+        /**
+         * Do the compaction
+         */
+        if ( group.shouldCompact( timeService.getCurrentTime() ) ) {
+            compact( scope, edgeMeta, group );
+            return AuditResult.COMPACTING;
+        }
+
+        //no op, there's nothing we need to do to this shard
+        return AuditResult.NOT_CHECKED;
+    }
+
+
+    private static final class ShardCompactionTaskTracker {
+        private BitSet runningTasks = new BitSet();
+
+
+        /**
+         * Sets this data into our scope to signal it's running to stop other threads from attempting to run
+         * @param scope
+         * @param edgeMeta
+         * @param group
+         * @return
+         */
+        public boolean shouldStartCompaction( final ApplicationScope scope, final DirectedEdgeMeta edgeMeta,
+                                     ShardEntryGroup group ) {
+            final int hash = doHash( scope, edgeMeta, group ).asInt();
+
+            if(runningTasks.get( hash )){
+                return false;
+            }
+
+            runningTasks.set( hash );
+
+            return true;
+        }
+
+
+        /**
+         * Mark this entry group as complete
+         * @param scope
+         * @param edgeMeta
+         * @param group
+         */
+        public void complete( final ApplicationScope scope, final DirectedEdgeMeta edgeMeta,
+                                             ShardEntryGroup group ) {
+            final int hash = doHash( scope, edgeMeta, group ).asInt();
+            runningTasks.clear( hash );
+        }
+
+
+        /**
+         * Hash our data into a consistent long
+         * @param scope
+         * @param directedEdgeMeta
+         * @param shardEntryGroup
+         * @return
+         */
+        private HashCode doHash( final ApplicationScope scope, final DirectedEdgeMeta directedEdgeMeta,
+                                 final ShardEntryGroup shardEntryGroup ) {
+
+            final Hasher hasher = MURMUR_128.newHasher();
+
+
+            addToHash( hasher, scope.getApplication() );
+
+            /**
+             * add our edge meta data
+             */
+            for ( DirectedEdgeMeta.NodeMeta nodeMeta : directedEdgeMeta.getNodes() ) {
+                addToHash( hasher, nodeMeta.getId() );
+                hasher.putInt( nodeMeta.getNodeType().getStorageValue() );
+            }
+
+
+            /**
+             * Add our edge type
+             */
+            for ( String type : directedEdgeMeta.getTypes() ) {
+                hasher.putString( type, CHARSET );
+            }
+
+            //add our compaction target to the hash
+            final Shard compactionTarget = shardEntryGroup.getCompactionTarget();
+
+            hasher.putLong( compactionTarget.getShardIndex() );
+
+
+            return hasher.hash();
+        }
+
+
+        private void addToHash( final Hasher hasher, final Id id ) {
+
+            final UUID nodeUuid = id.getUuid();
+            final String nodeType = id.getType();
+
+            hasher.putLong( nodeUuid.getMostSignificantBits() ).putLong( nodeUuid.getLeastSignificantBits() )
+                  .putString( nodeType, CHARSET );
+        }
+    }
+
+    private enum StartResult{
+        /**
+         * Returned if the compaction was started
+         */
+
+        STARTED,
 
+        /**
+         * Returned if we are running the compaction
+         */
+        RUNNING;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c69c1974/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardRowIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardRowIterator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardRowIterator.java
deleted file mode 100644
index 0bbb011..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardRowIterator.java
+++ /dev/null
@@ -1,134 +0,0 @@
-package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
-
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-import org.apache.usergrid.persistence.core.astyanax.ColumnNameIterator;
-import org.apache.usergrid.persistence.core.astyanax.MultiKeyColumnNameIterator;
-import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
-import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-
-import com.netflix.astyanax.Keyspace;
-import com.netflix.astyanax.model.ConsistencyLevel;
-import com.netflix.astyanax.query.RowQuery;
-import com.netflix.astyanax.util.RangeBuilder;
-
-
-/**
- * Internal iterator to iterate over multiple row keys
- *
- * @param <R> The row type
- * @param <C> The column type
- * @param <T> The parsed return type
- */
-public class ShardRowIterator<R, C, T> implements Iterator<T> {
-
-    private final EdgeSearcher<R, C, T> searcher;
-
-    private final MultiTennantColumnFamily<ApplicationScope, R, C> cf;
-
-    private Iterator<T> currentColumnIterator;
-
-    private final Keyspace keyspace;
-
-    private final int pageSize;
-
-    private final ConsistencyLevel consistencyLevel;
-
-
-    public ShardRowIterator( final EdgeSearcher<R, C, T> searcher,
-                             final MultiTennantColumnFamily<ApplicationScope, R, C> cf, final Keyspace keyspace,
-                             final ConsistencyLevel consistencyLevel, final int pageSize ) {
-        this.searcher = searcher;
-        this.cf = cf;
-        this.keyspace = keyspace;
-        this.pageSize = pageSize;
-        this.consistencyLevel = consistencyLevel;
-    }
-
-
-    @Override
-    public boolean hasNext() {
-        //we have more columns to return
-        if ( currentColumnIterator != null && currentColumnIterator.hasNext() ) {
-            return true;
-        }
-
-        /**
-         * We have another row key, advance to it and re-check
-         */
-        if ( searcher.hasNext() ) {
-            advanceRow();
-            return hasNext();
-        }
-
-        //we have no more columns, and no more row keys, we're done
-        return false;
-    }
-
-
-    @Override
-    public T next() {
-        if ( !hasNext() ) {
-            throw new NoSuchElementException( "There are no more rows or columns left to advance" );
-        }
-
-        return currentColumnIterator.next();
-    }
-
-
-    @Override
-    public void remove() {
-        throw new UnsupportedOperationException( "Remove is unsupported" );
-    }
-
-
-    /**
-     * Advance our iterator to the next row (assumes the check for row keys is elsewhere)
-     */
-    private void advanceRow() {
-
-        /**
-         * If the edge is present, we need to being seeking from this
-         */
-
-        final RangeBuilder rangeBuilder = new RangeBuilder().setLimit( pageSize );
-
-
-        //set the range into the search
-        searcher.setRange( rangeBuilder );
-
-        /**
-         * Get our list of slices
-         */
-        final List<ScopedRowKey<ApplicationScope, R>> rowKeys = searcher.next();
-
-
-        final List<ColumnNameIterator<C, T>> columnNameIterators = new ArrayList<>( rowKeys.size() );
-
-        for(ScopedRowKey<ApplicationScope, R> rowKey: rowKeys){
-
-
-
-           final  RowQuery<ScopedRowKey<ApplicationScope, R>, C> query =
-                    keyspace.prepareQuery( cf ).setConsistencyLevel( consistencyLevel ).getKey( rowKey )
-                            .autoPaginate( true ).withColumnRange( rangeBuilder.build() );
-
-
-            final ColumnNameIterator<C, T> columnNameIterator = new ColumnNameIterator<>( query, searcher, searcher.hasPage() );
-
-            columnNameIterators.add( columnNameIterator );
-
-        }
-
-
-
-        currentColumnIterator = new MultiKeyColumnNameIterator<>(columnNameIterators, searcher, pageSize);
-
-
-    }
-}