You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2014/08/14 17:20:21 UTC

[2/6] Implemented New rolling shard algorithm. This should allow eventual shard consistency between all clients without the need for an external locking allocation system

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9d652dd/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
index 5c846f1..4130771 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
@@ -20,19 +20,23 @@ package org.apache.usergrid.persistence.graph.serialization.impl.shard;
 
 
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
-import java.util.List;
 
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
+import org.apache.cassandra.thrift.Mutation;
+
 import org.apache.usergrid.persistence.core.consistency.TimeService;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.GraphFig;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.count.NodeShardCounterSerialization;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.count.ShardKey;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.SearchByIdType;
+import org.apache.usergrid.persistence.graph.exception.GraphRuntimeException;
+import org.apache.usergrid.persistence.graph.impl.SimpleMarkedEdge;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.NodeShardAllocationImpl;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
@@ -40,11 +44,13 @@ import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 import com.google.common.base.Optional;
 import com.netflix.astyanax.Keyspace;
 import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
 import static junit.framework.TestCase.assertTrue;
 import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createId;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.same;
@@ -76,53 +82,100 @@ public class NodeShardAllocationTest {
 
         when( graphFig.getShardCacheSize() ).thenReturn( 10000l );
         when( graphFig.getShardSize() ).thenReturn( 20000l );
-        when( graphFig.getShardCacheTimeout()).thenReturn( 30000l );
+
+        final long timeout = 30000;
+        when( graphFig.getShardCacheTimeout() ).thenReturn( timeout );
+        when( graphFig.getShardMinDelta() ).thenReturn( timeout * 2 );
     }
 
 
     @Test
-    public void noShards() {
+    public void minTime() {
         final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
 
-        final NodeShardApproximation nodeShardCounterSerialization =
-                mock( NodeShardApproximation.class );
+        final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
+
+        final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
+
+        final NodeShardApproximation nodeShardCounterSerialization = mock( NodeShardApproximation.class );
 
 
         final TimeService timeService = mock( TimeService.class );
 
         final Keyspace keyspace = mock( Keyspace.class );
 
-        final MutationBatch batch = mock( MutationBatch.class );
-
-        when( keyspace.prepareMutationBatch() ).thenReturn( batch );
 
         NodeShardAllocation approximation =
-                new NodeShardAllocationImpl( edgeShardSerialization, nodeShardCounterSerialization, timeService,
-                        graphFig );
+                new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
+                        nodeShardCounterSerialization, timeService, graphFig, keyspace );
 
-        final Id nodeId = createId( "test" );
-        final String type = "type";
-        final String subType = "subType";
 
-        /**
-         * Mock up returning an empty iterator, our audit shouldn't create a new shard
-         */
-        when( edgeShardSerialization
-                .getEdgeMetaData( same( scope ), same( nodeId ), any( Optional.class ),  same( type ),
-                        same( subType ) ) ).thenReturn( Collections.<Long>emptyList().iterator() );
+        final long timeservicetime = System.currentTimeMillis();
 
-        final boolean result = approximation.auditMaxShard( scope, nodeId, type, subType );
+        when( timeService.getCurrentTime() ).thenReturn( timeservicetime );
 
-        assertFalse( "No shard allocated", result );
+        final long expected = timeservicetime - 2 * graphFig.getShardCacheTimeout();
+
+        final long returned = approximation.getMinTime();
+
+        assertEquals( "Correct time was returned", expected, returned );
     }
 
 
+    //    @Test
+    //    public void noShards() {
+    //        final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
+    //
+    //        final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
+    //
+    //        final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
+    //
+    //
+    //        final NodeShardApproximation nodeShardCounterSerialization = mock( NodeShardApproximation.class );
+    //
+    //
+    //        final TimeService timeService = mock( TimeService.class );
+    //
+    //        final Keyspace keyspace = mock( Keyspace.class );
+    //
+    //        final MutationBatch batch = mock( MutationBatch.class );
+    //
+    //        when( keyspace.prepareMutationBatch() ).thenReturn( batch );
+    //
+    //        NodeShardAllocation approximation =
+    //                new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
+    //                        nodeShardCounterSerialization, timeService, graphFig, keyspace );
+    //
+    //        final Id nodeId = createId( "test" );
+    //        final String type = "type";
+    //        final String subType = "subType";
+    //
+    //        /**
+    //         * Mock up returning an empty iterator, our audit shouldn't create a new shard
+    //         */
+    //
+    //        final DirectedEdgeMeta targetEdgeMeta = DirectedEdgeMeta.fromSourceNodeTargetType(nodeId, type, subType );
+    //
+    //
+    //        when( edgeShardSerialization
+    //                .getShardMetaData( same( scope ), any( Optional.class ), same(targetEdgeMeta)) ).thenReturn(
+    // Collections.<Shard>emptyList().iterator() );
+    //
+    //        final boolean result = approximation.auditShard(scope, null, targetEdgeMeta);
+    //
+    //        assertFalse( "No shard allocated", result );
+    //    }
+
+
     @Test
-    public void existingFutureShard() {
+    public void existingFutureShardSameTime() {
         final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
 
-        final NodeShardApproximation nodeShardCounterSerialization =
-                mock( NodeShardApproximation.class );
+        final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
+
+        final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
+
+        final NodeShardApproximation nodeShardCounterSerialization = mock( NodeShardApproximation.class );
 
 
         final TimeService timeService = mock( TimeService.class );
@@ -136,8 +189,8 @@ public class NodeShardAllocationTest {
 
 
         NodeShardAllocation approximation =
-                new NodeShardAllocationImpl( edgeShardSerialization, nodeShardCounterSerialization, timeService,
-                        graphFig );
+                new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
+                        nodeShardCounterSerialization, timeService, graphFig, keyspace );
 
         final Id nodeId = createId( "test" );
         final String type = "type";
@@ -148,16 +201,21 @@ public class NodeShardAllocationTest {
 
         when( timeService.getCurrentTime() ).thenReturn( timeservicetime );
 
-        final long futureShard =  timeservicetime + graphFig.getShardCacheTimeout() * 2 ;
+        final Shard futureShard = new Shard( 10000l, timeservicetime, true );
+
+        final ShardEntryGroup shardEntryGroup = new ShardEntryGroup( 1000l );
+        shardEntryGroup.addShard( futureShard );
+
+        final DirectedEdgeMeta targetEdgeMeta = DirectedEdgeMeta.fromSourceNodeTargetType( nodeId, type, subType );
 
         /**
          * Mock up returning a min shard, and a future shard
          */
-        when( edgeShardSerialization
-                .getEdgeMetaData( same( scope ), same( nodeId ), any( Optional.class ),  same( type ),
-                        same( subType ) ) ).thenReturn( Arrays.asList( futureShard ).iterator() );
+        when( edgeShardSerialization.getShardMetaData( same( scope ), any( Optional.class ), same( targetEdgeMeta ) ) )
+                .thenReturn( Arrays.asList( futureShard ).iterator() );
 
-        final boolean result = approximation.auditMaxShard( scope, nodeId, type, subType );
+
+        final boolean result = approximation.auditShard( scope,  shardEntryGroup, targetEdgeMeta );
 
         assertFalse( "No shard allocated", result );
     }
@@ -167,8 +225,11 @@ public class NodeShardAllocationTest {
     public void lowCountFutureShard() {
         final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
 
-        final NodeShardApproximation nodeShardApproximation =
-                mock( NodeShardApproximation.class );
+        final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
+
+        final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
+
+        final NodeShardApproximation nodeShardApproximation = mock( NodeShardApproximation.class );
 
 
         final TimeService timeService = mock( TimeService.class );
@@ -181,8 +242,8 @@ public class NodeShardAllocationTest {
 
 
         NodeShardAllocation approximation =
-                new NodeShardAllocationImpl( edgeShardSerialization, nodeShardApproximation, timeService,
-                        graphFig );
+                new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
+                        nodeShardApproximation, timeService, graphFig, keyspace );
 
         final Id nodeId = createId( "test" );
         final String type = "type";
@@ -193,23 +254,32 @@ public class NodeShardAllocationTest {
 
         when( timeService.getCurrentTime() ).thenReturn( timeservicetime );
 
+        final Shard futureShard = new Shard( 10000l, timeservicetime, true );
+
+        final ShardEntryGroup shardEntryGroup = new ShardEntryGroup( 1000l );
+        shardEntryGroup.addShard( futureShard );
+
+        final DirectedEdgeMeta targetEdgeMeta = DirectedEdgeMeta.fromSourceNodeTargetType( nodeId, type, subType );
+
+
+
 
         /**
          * Mock up returning a min shard, and a future shard
          */
         when( edgeShardSerialization
-                .getEdgeMetaData( same( scope ), same( nodeId ), any( Optional.class ),  same( type ),
-                        same( subType ) ) ).thenReturn( Arrays.asList( 0l ).iterator() );
+                .getShardMetaData( same( scope ), any( Optional.class ), same(targetEdgeMeta)) )
+                .thenReturn( Arrays.asList( new Shard( 0l, 0l, true ) ).iterator() );
 
 
         //return a shard size < our max by 1
 
         final long count = graphFig.getShardSize() - 1;
 
-        when( nodeShardApproximation.getCount(scope, nodeId, 0l, type, subType ))
-                                           .thenReturn( count );
+        when( nodeShardApproximation.getCount( scope, futureShard, targetEdgeMeta ) )
+                .thenReturn( count );
 
-        final boolean result = approximation.auditMaxShard( scope, nodeId, type, subType );
+        final boolean result = approximation.auditShard( scope, shardEntryGroup, targetEdgeMeta);
 
         assertFalse( "Shard allocated", result );
     }
@@ -219,22 +289,25 @@ public class NodeShardAllocationTest {
     public void equalCountFutureShard() {
         final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
 
-        final NodeShardApproximation nodeShardApproximation =
-                mock( NodeShardApproximation.class );
+        final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
+
+        final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
+
+        final NodeShardApproximation nodeShardApproximation = mock( NodeShardApproximation.class );
 
 
         final TimeService timeService = mock( TimeService.class );
 
         final Keyspace keyspace = mock( Keyspace.class );
 
-        final MutationBatch batch = mock(MutationBatch.class);
+        final MutationBatch batch = mock( MutationBatch.class );
 
-        when(keyspace.prepareMutationBatch()).thenReturn( batch );
+        when( keyspace.prepareMutationBatch() ).thenReturn( batch );
 
 
         NodeShardAllocation approximation =
-                new NodeShardAllocationImpl( edgeShardSerialization, nodeShardApproximation, timeService,
-                        graphFig );
+                new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
+                        nodeShardApproximation, timeService, graphFig, keyspace );
 
         final Id nodeId = createId( "test" );
         final String type = "type";
@@ -245,69 +318,92 @@ public class NodeShardAllocationTest {
 
         when( timeService.getCurrentTime() ).thenReturn( timeservicetime );
 
+        final Shard futureShard = new Shard( 0l, 0l, true );
+
+        final ShardEntryGroup shardEntryGroup = new ShardEntryGroup( 1000l );
+        shardEntryGroup.addShard( futureShard );
+
+        final DirectedEdgeMeta targetEdgeMeta = DirectedEdgeMeta.fromSourceNodeTargetType( nodeId, type, subType );
+
+
 
         /**
          * Mock up returning a min shard
          */
         when( edgeShardSerialization
-                .getEdgeMetaData( same( scope ), same( nodeId ), any( Optional.class ),  same( type ),
-                        same( subType ) ) ).thenReturn( Arrays.asList( 0l ).iterator() );
+                .getShardMetaData( same( scope ), any( Optional.class ), same(targetEdgeMeta) ) )
+                .thenReturn( Arrays.asList( futureShard ).iterator() );
 
 
         final long shardCount = graphFig.getShardSize();
 
         //return a shard size equal to our max
-        when( nodeShardApproximation
-                .getCount(   scope , nodeId, 0l,type , subType  ))
+        when( nodeShardApproximation.getCount( scope, futureShard, targetEdgeMeta) )
                 .thenReturn( shardCount );
 
-        ArgumentCaptor<Long> newUUIDValue = ArgumentCaptor.forClass( Long.class );
+        ArgumentCaptor<Shard> shardValue = ArgumentCaptor.forClass( Shard.class );
 
 
         //mock up our mutation
         when( edgeShardSerialization
-                .writeEdgeMeta( same( scope ), same( nodeId ), newUUIDValue.capture(), same( type ), same( subType ) ) )
-                .thenReturn( mock( MutationBatch.class ) );
+                .writeShardMeta( same( scope ), shardValue.capture(), same(targetEdgeMeta) ) ).thenReturn( mock( MutationBatch.class ) );
 
 
-        final boolean result = approximation.auditMaxShard( scope, nodeId, type, subType );
+        final SimpleMarkedEdge returnedEdge =
+                new SimpleMarkedEdge( nodeId, type, createId( "subType" ), 10005l, false );
+
+        final Iterator<MarkedEdge> edgeIterator = Collections.singleton( ( MarkedEdge ) returnedEdge ).iterator();
+
+        //mock up returning the value
+        when( shardedEdgeSerialization
+                .getEdgesFromSourceByTargetType( same( edgeColumnFamilies ), same( scope ), any( SearchByIdType.class ),
+                        any( Iterator.class ) ) ).thenReturn( edgeIterator );
+
+
+        final boolean result = approximation.auditShard( scope, shardEntryGroup, targetEdgeMeta);
 
         assertTrue( "Shard allocated", result );
 
         //check our new allocated UUID
 
-        final long expectedTime = timeservicetime + 2 * graphFig.getShardCacheTimeout();
 
-        final long savedTimestamp = newUUIDValue.getValue();
+        final long savedTimestamp = shardValue.getValue().getCreatedTime();
 
 
+        assertEquals( "Expected time service time", timeservicetime, savedTimestamp );
 
-        assertEquals( "Expected at 2x timeout generated", expectedTime, savedTimestamp );
-    }
 
+        //now check our max value was set
 
+        final long savedShardPivot = shardValue.getValue().getShardIndex();
+
+        assertEquals( "Expected max value to be the same", returnedEdge.getTimestamp(), savedShardPivot );
+    }
 
 
     @Test
     public void futureCountShardCleanup() {
         final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
 
-        final NodeShardApproximation nodeShardApproximation =
-                mock( NodeShardApproximation.class );
+        final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
+
+        final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
+
+        final NodeShardApproximation nodeShardApproximation = mock( NodeShardApproximation.class );
 
 
         final TimeService timeService = mock( TimeService.class );
 
         final Keyspace keyspace = mock( Keyspace.class );
 
-        final MutationBatch batch = mock(MutationBatch.class);
+        final MutationBatch batch = mock( MutationBatch.class );
 
-        when(keyspace.prepareMutationBatch()).thenReturn( batch );
+        when( keyspace.prepareMutationBatch() ).thenReturn( batch );
 
 
         NodeShardAllocation approximation =
-                new NodeShardAllocationImpl( edgeShardSerialization, nodeShardApproximation, timeService,
-                        graphFig );
+                new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
+                        nodeShardApproximation, timeService, graphFig, keyspace );
 
         final Id nodeId = createId( "test" );
         final String type = "type";
@@ -315,94 +411,133 @@ public class NodeShardAllocationTest {
 
 
         /**
-         * Use the time service to generate UUIDS
+         * Use the time service to generate timestamps
          */
-        final long timeservicetime = System.currentTimeMillis();
+        final long timeservicetime = 10000;
 
 
         when( timeService.getCurrentTime() ).thenReturn( timeservicetime );
 
-        assertTrue("Shard cache mocked", graphFig.getShardCacheTimeout() > 0);
+        assertTrue( "Shard cache mocked", graphFig.getShardCacheTimeout() > 0 );
 
 
         /**
          * Simulates clock drift when 2 nodes create future shards near one another
          */
-        final long futureTime = timeService.getCurrentTime()  + 2 * graphFig.getShardCacheTimeout();
+        final long minDelta = graphFig.getShardMinDelta();
+
+
+        final Shard minShard = new Shard( 0l, 0l, true );
 
+        //a shard that isn't our minimum, but exists after compaction
+        final Shard compactedShard = new Shard( 5000, 1000, true );
 
         /**
-         * Simulate slow node
+         * Simulate different node time allocation
          */
-        final long futureShard1 = futureTime - 1;
 
-        final long futureShard2 = futureTime + 10000;
+        final long minTime = 10000;
+        //our second shard is the "oldest", and hence should be returned in the iterator.  Future shard 1 and 3
+        // should be removed
 
-        final long futureShard3 = futureShard2 + 10000;
+        //this should get dropped, It's allocated after future shard2 even though the time is less
+        final Shard futureShard1 = new Shard( 10000, minTime + minDelta, false );
 
+        //should get kept.
+        final Shard futureShard2 = new Shard( 10005, minTime, false );
 
-        final int pageSize = 100;
+        //should be removed
+        final Shard futureShard3 = new Shard( 10010, minTime + minDelta / 2, false );
+
+        final DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromTargetNodeSourceType( nodeId, type, subType );
 
         /**
          * Mock up returning a min shard
          */
         when( edgeShardSerialization
-                .getEdgeMetaData( same( scope ), same( nodeId ), any( Optional.class ), same( type ),
-                        same( subType ) ) ).thenReturn( Arrays.asList(futureShard3, futureShard2, futureShard1, 0l).iterator() );
+                .getShardMetaData( same( scope ), any( Optional.class ), same(directedEdgeMeta)) ).thenReturn(
+                Arrays.asList( futureShard3, futureShard2, futureShard1, compactedShard, minShard ).iterator() );
 
 
+        ArgumentCaptor<Shard> newLongValue = ArgumentCaptor.forClass( Shard.class );
 
-        ArgumentCaptor<Long> newLongValue = ArgumentCaptor.forClass( Long.class );
 
+        //mock up our mutation
+        when( edgeShardSerialization
+                .removeShardMeta( same( scope ), newLongValue.capture(), same(directedEdgeMeta)) ).thenReturn( mock( MutationBatch.class ) );
 
 
+        final Iterator<ShardEntryGroup> result =
+                approximation.getShards( scope, Optional.<Shard>absent(), directedEdgeMeta);
 
-        //mock up our mutation
-        when( edgeShardSerialization
-                .removeEdgeMeta( same( scope ), same( nodeId ), newLongValue.capture(), same( type ), same( subType ) ) )
-                .thenReturn( mock( MutationBatch.class ) );
 
+        assertTrue( "Shards present", result.hasNext() );
+
+
+        ShardEntryGroup shardEntryGroup = result.next();
+
+        assertEquals( "Future shard returned", futureShard1, shardEntryGroup.getCompactionTarget() );
 
-        final Iterator<Long>
-                result = approximation.getShards( scope, nodeId, Optional.<Long>absent(), type, subType );
+
+        //now verify all 4 are in this group.  This is because the first shard (0,0) (n-1_ may be the only shard other
+        //nodes see while we're rolling our state.  This means it should be read and merged from as well
+
+        Collection<Shard> writeShards = shardEntryGroup.getWriteShards( minTime + minDelta );
+
+        assertEquals( "Shard size as expected", 4, writeShards.size() );
+
+        assertTrue( writeShards.contains( futureShard1 ) );
+        assertTrue( writeShards.contains( futureShard2 ) );
+        assertTrue( writeShards.contains( futureShard3 ) );
+        assertTrue( writeShards.contains( compactedShard ) );
+
+
+        Collection<Shard> readShards = shardEntryGroup.getReadShards();
+
+        assertEquals( "Shard size as expected", 4, readShards.size() );
+
+        assertTrue( readShards.contains( futureShard1 ) );
+        assertTrue( readShards.contains( futureShard2 ) );
+        assertTrue( readShards.contains( futureShard3 ) );
+        assertTrue( readShards.contains( compactedShard ) );
 
 
         assertTrue( "Shards present", result.hasNext() );
 
-        assertEquals("Only single next shard returned", futureShard1,  result.next().longValue());
+        shardEntryGroup = result.next();
 
-        assertTrue("Shards present", result.hasNext());
 
-        assertEquals("Previous shard present", 0l, result.next().longValue());
+        writeShards = shardEntryGroup.getWriteShards( minTime + minDelta );
 
-        assertFalse("No shards left", result.hasNext());
 
-        /**
-         * Now we need to verify that both our mutations have been added
-         */
+        assertTrue( "Previous shard present", writeShards.contains( minShard ) );
 
-        List<Long> values = newLongValue.getAllValues();
 
-        assertEquals("2 values removed", 2,  values.size());
+        writeShards = shardEntryGroup.getReadShards();
 
-        assertEquals("Deleted Max Future", futureShard3, values.get( 0 ).longValue());
-        assertEquals("Deleted Next Future", futureShard2, values.get( 1 ).longValue());
 
-    }
+        assertTrue( "Previous shard present", writeShards.contains( minShard ) );
 
 
+        assertFalse( "No shards left", result.hasNext() );
+    }
 
 
     @Test
-    public void noShardsReturns() {
+    public void noShardsReturns() throws ConnectionException {
         final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
 
-        final NodeShardApproximation nodeShardApproximation =
-                mock( NodeShardApproximation.class );
+        final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
+
+        final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
+
+        final NodeShardApproximation nodeShardApproximation = mock( NodeShardApproximation.class );
 
 
         final TimeService timeService = mock( TimeService.class );
 
+        when( timeService.getCurrentTime() ).thenReturn( 10000l );
+
         final Keyspace keyspace = mock( Keyspace.class );
 
         final MutationBatch batch = mock( MutationBatch.class );
@@ -410,25 +545,139 @@ public class NodeShardAllocationTest {
         when( keyspace.prepareMutationBatch() ).thenReturn( batch );
 
         NodeShardAllocation approximation =
-                new NodeShardAllocationImpl( edgeShardSerialization, nodeShardApproximation, timeService,
-                        graphFig );
+                new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
+                        nodeShardApproximation, timeService, graphFig, keyspace );
 
         final Id nodeId = createId( "test" );
         final String type = "type";
         final String subType = "subType";
 
+        final DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromTargetNodeSourceType( nodeId, type, subType );
+
+
+
         /**
          * Mock up returning an empty iterator, our audit shouldn't create a new shard
          */
         when( edgeShardSerialization
-                .getEdgeMetaData( same( scope ), same( nodeId ), any( Optional.class ),  same( type ),
-                        same( subType ) ) ).thenReturn( Collections.<Long>emptyList().iterator() );
+                .getShardMetaData( same( scope ), any( Optional.class ), same(directedEdgeMeta) ) ).thenReturn( Collections.<Shard>emptyList().iterator() );
+
+
+        ArgumentCaptor<Shard> shardArgumentCaptor = ArgumentCaptor.forClass( Shard.class );
+
+        when( edgeShardSerialization
+                .writeShardMeta( same( scope ), shardArgumentCaptor.capture(), same(directedEdgeMeta)) ).thenReturn( batch );
+
+
+        final Iterator<ShardEntryGroup> result =
+                approximation.getShards( scope, Optional.<Shard>absent(), directedEdgeMeta );
+
+
+        ShardEntryGroup shardEntryGroup = result.next();
+
+        final Shard rootShard = new Shard( 0, 0, true );
+
+        assertEquals( "Shard size expected", 1, shardEntryGroup.entrySize() );
 
-        final Iterator<Long> result = approximation.getShards( scope, nodeId, Optional.<Long>absent(), type, subType );
 
-        assertEquals("0 shard allocated", 0l, result.next().longValue());
+        //ensure we persisted the new shard.
+        assertEquals( "Root shard was persisted", rootShard, shardArgumentCaptor.getValue() );
 
-        assertFalse( "No shard allocated", result.hasNext() );
+
+        //now verify all 4 are in this group.  This is because the first shard (0,0) (n-1_ may be the only shard other
+        //nodes see while we're rolling our state.  This means it should be read and merged from as well
+
+        Collection<Shard> writeShards = shardEntryGroup.getWriteShards( timeService.getCurrentTime() );
+
+        Collection<Shard> readShards = shardEntryGroup.getReadShards();
+
+
+        assertTrue( "root shard allocated", writeShards.contains( rootShard ) );
+
+        assertTrue( "root shard allocated", readShards.contains( rootShard ) );
+
+
+        assertFalse( "No other shard group allocated", result.hasNext() );
     }
 
+
+    @Test
+    public void invalidConfiguration() {
+
+        final GraphFig graphFig = mock( GraphFig.class );
+
+        final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
+
+        final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
+
+        final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
+
+        final NodeShardApproximation nodeShardApproximation = mock( NodeShardApproximation.class );
+
+
+        /**
+         * Return 100000 milliseconds
+         */
+        final TimeService timeService = mock( TimeService.class );
+
+        final long time = 100000l;
+
+        when( timeService.getCurrentTime() ).thenReturn( time );
+
+
+        final long cacheTimeout = 30000l;
+
+        when( graphFig.getShardCacheTimeout() ).thenReturn( 30000l );
+
+
+        final long tooSmallDelta = ( long ) ( ( cacheTimeout * 2 ) * .99 );
+
+        when( graphFig.getShardMinDelta() ).thenReturn( tooSmallDelta );
+
+
+        final Keyspace keyspace = mock( Keyspace.class );
+
+        final MutationBatch batch = mock( MutationBatch.class );
+
+        when( keyspace.prepareMutationBatch() ).thenReturn( batch );
+
+
+        NodeShardAllocation approximation =
+                new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
+                        nodeShardApproximation, timeService, graphFig, keyspace );
+
+
+        /**
+         * Should throw an exception
+         */
+        try {
+            approximation.getMinTime();
+            fail( "Should have thrown a GraphRuntimeException" );
+        }
+        catch ( GraphRuntimeException gre ) {
+            //swallow
+        }
+
+        //now test something that passes.
+
+        final long minDelta = cacheTimeout * 2;
+
+        when( graphFig.getShardMinDelta() ).thenReturn( minDelta );
+
+        long returned = approximation.getMinTime();
+
+        long expectedReturned = time - minDelta;
+
+        assertEquals( expectedReturned, returned );
+
+        final long delta = cacheTimeout * 4;
+
+        when( graphFig.getShardMinDelta() ).thenReturn( delta );
+
+        returned = approximation.getMinTime();
+
+        expectedReturned = time - delta;
+
+        assertEquals( expectedReturned, returned );
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9d652dd/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCacheTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCacheTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCacheTest.java
index 6c46c32..f00a380 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCacheTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCacheTest.java
@@ -22,15 +22,16 @@ package org.apache.usergrid.persistence.graph.serialization.impl.shard;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
-import java.util.UUID;
 
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
+import org.apache.usergrid.persistence.core.consistency.TimeService;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.GraphFig;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.NodeShardCacheImpl;
-import org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 
@@ -39,6 +40,9 @@ import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
 import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createId;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.same;
@@ -76,6 +80,8 @@ public class NodeShardCacheTest {
 
         final NodeShardAllocation allocation = mock( NodeShardAllocation.class );
 
+        final TimeService time = mock(TimeService.class);
+
         final Id id = createId( "test" );
 
         final String edgeType = "edge";
@@ -86,78 +92,67 @@ public class NodeShardCacheTest {
         final long newTime = 10000l;
 
 
-        NodeShardCache cache = new NodeShardCacheImpl( allocation, graphFig );
+        NodeShardCache cache = new NodeShardCacheImpl( allocation, graphFig, time );
 
 
         final Optional max = Optional.absent();
-        /**
-         * Simulate returning no shards at all.
-         */
-        when( allocation
-                .getShards( same( scope ), same( id ), same( max), same( edgeType ),
-                        same( otherIdType ) ) )
-                .thenReturn( Collections.singletonList( 0l ).iterator() );
-
 
-        long slice = cache.getSlice( scope, id, newTime, edgeType, otherIdType );
 
+        final ShardEntryGroup group = new ShardEntryGroup( newTime );
+        group.addShard( new Shard(0, 0, true) );
 
-        //we return the min UUID possible, all edges should start by writing to this edge
-        assertEquals(0l, slice );
 
+        DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromSourceNodeTargetType( id, edgeType, otherIdType ) ;
 
         /**
-         * Verify that we fired the audit
+         * Simulate returning no shards at all.
          */
-        verify( allocation ).auditMaxShard( scope, id, edgeType, otherIdType );
-    }
-
-
-    @Test
-    public void testSingleExistingShard() {
-
-        final GraphFig graphFig = getFigMock();
-
-        final NodeShardAllocation allocation = mock( NodeShardAllocation.class );
+        when( allocation.getShards( same( scope ), same( max ), same( directedEdgeMeta ) ) )
 
+                //use "thenAnswer" so we always return the value, even if  it's invoked more than 1 time.
+                .thenAnswer( new Answer<Iterator<ShardEntryGroup>>() {
 
-        final Id id = createId( "test" );
+                    @Override
+                    public Iterator<ShardEntryGroup> answer( final InvocationOnMock invocationOnMock )
+                            throws Throwable {
+                        return Collections.singletonList( group ).iterator();
+                    }
+                });
 
-        final String edgeType = "edge";
 
-        final String otherIdType = "type";
+        ShardEntryGroup returnedGroup = cache.getWriteShardGroup( scope, newTime, directedEdgeMeta );
 
+        //ensure it's the same group
+        assertSame(group, returnedGroup);
 
-        final long newTime = 10000l;
 
-        final long min = 0;
+        Iterator<ShardEntryGroup>
+                shards = cache.getReadShardGroup( scope, newTime, directedEdgeMeta );
 
+        assertTrue(shards.hasNext());
 
-        NodeShardCache cache = new NodeShardCacheImpl( allocation, graphFig );
 
+        returnedGroup = shards.next();
 
-        final Optional max = Optional.absent();
+        assertSame("Single shard group expected", group, returnedGroup);
 
-        /**
-         * Simulate returning single shard
-         */
-        when( allocation.getShards( same( scope ), same( id ), same(max),
-                same( edgeType ), same( otherIdType ) ) ).thenReturn( Collections.singletonList( min ).iterator() );
+        assertFalse(shards.hasNext());
 
 
-        long slice = cache.getSlice( scope, id, newTime, edgeType, otherIdType );
 
 
         //we return the min UUID possible, all edges should start by writing to this edge
-        assertEquals( min, slice );
 
         /**
          * Verify that we fired the audit
+         *
+         * TODO, us the GUAVA Tick to make this happen
          */
-        verify( allocation ).auditMaxShard( scope, id, edgeType, otherIdType );
+//        verify(and allocation ).auditMaxShard( scope, id, edgeType, otherIdType );
     }
 
 
+
     @Test
     public void testRangeShard() {
 
@@ -165,6 +160,8 @@ public class NodeShardCacheTest {
 
         final NodeShardAllocation allocation = mock( NodeShardAllocation.class );
 
+        final TimeService time = mock(TimeService.class);
+
         final Id id = createId( "test" );
 
         final String edgeType = "edge";
@@ -175,150 +172,157 @@ public class NodeShardCacheTest {
         /**
          * Set our min mid and max
          */
-        final long min = 0;
 
+        NodeShardCache cache = new NodeShardCacheImpl( allocation, graphFig, time );
+
+
+        final Shard minShard = new Shard(0, 0, true);
+        final Shard midShard = new Shard(10000, 1000, true);
+        final Shard maxShard = new Shard(20000, 2000, true);
+
+
+        /**
+         * Simulate returning all shards
+         */
+        final ShardEntryGroup minShardGroup = new ShardEntryGroup( 10000 );
+        minShardGroup.addShard( minShard );
+
+        final ShardEntryGroup midShardGroup = new ShardEntryGroup( 10000 );
+        midShardGroup.addShard( midShard );
 
-        final long mid = 10000;
 
+        final ShardEntryGroup maxShardGroup = new ShardEntryGroup( 10000 );
+        maxShardGroup.addShard( maxShard );
 
-        final long max = 20000;
 
 
-        NodeShardCache cache = new NodeShardCacheImpl( allocation, graphFig );
+
+        DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromTargetNodeSourceType( id, edgeType, otherIdType ) ;
 
 
         /**
-         * Simulate returning all shards
+         * Simulate returning no shards at all.
          */
-        when( allocation.getShards( same( scope ), same( id ), any( Optional.class ),
-                same( edgeType ), same( otherIdType ) ) ).thenReturn( Arrays.asList( min, mid, max ).iterator() );
+        when( allocation
+                .getShards( same( scope ), any( Optional.class ), same( directedEdgeMeta ) ) )
 
+                //use "thenAnswer" so we always return the value, even if  it's invoked more than 1 time.
+                .thenAnswer( new Answer<Iterator<ShardEntryGroup>>(){
 
-        //check getting equal to our min, mid and max
+                    @Override
+                    public Iterator<ShardEntryGroup> answer( final InvocationOnMock invocationOnMock )
+                            throws Throwable {
+                        return Arrays.asList( maxShardGroup, midShardGroup, minShardGroup ).iterator();
+                    }
+                });
 
-        long slice = cache.getSlice( scope, id, min, edgeType, otherIdType );
 
+        //check getting equal to our min, mid and max
 
-        //we return the min UUID possible, all edges should start by writing to this edge
-        assertEquals( min, slice );
+        ShardEntryGroup writeShard  = cache.getWriteShardGroup( scope, minShard.getShardIndex(), directedEdgeMeta );
 
-        slice = cache.getSlice( scope, id, mid,
-                edgeType, otherIdType );
+        assertSame(minShardGroup, writeShard);
 
 
-        //we return the mid UUID possible, all edges should start by writing to this edge
-        assertEquals( mid, slice );
+        Iterator<ShardEntryGroup>
+                groups = cache.getReadShardGroup( scope, minShard.getShardIndex(), directedEdgeMeta);
 
-        slice = cache.getSlice( scope, id, max ,
-                edgeType, otherIdType );
+        assertTrue(groups.hasNext());
 
+        assertSame("min shard expected", minShardGroup, groups.next());
 
-        //we return the mid UUID possible, all edges should start by writing to this edge
-        assertEquals( max, slice );
+        assertFalse(groups.hasNext());
 
-        //now test in between
-        slice = cache.getSlice( scope, id, min+1, edgeType, otherIdType );
 
 
-        //we return the min UUID possible, all edges should start by writing to this edge
-        assertEquals( min, slice );
 
-        slice = cache.getSlice( scope, id,   mid-1, edgeType, otherIdType );
+        //mid
+        writeShard = cache.getWriteShardGroup( scope, midShard.getShardIndex(), directedEdgeMeta );
 
+        assertSame(midShardGroup, writeShard);
 
-        //we return the min UUID possible, all edges should start by writing to this edge
-        assertEquals( min, slice );
 
+        groups =  cache.getReadShardGroup( scope, midShard.getShardIndex(), directedEdgeMeta );
 
-        slice = cache.getSlice( scope, id,   mid+1, edgeType, otherIdType );
+        assertTrue(groups.hasNext());
 
+        assertSame("mid shard expected", midShardGroup, groups.next());
 
-        //we return the mid UUID possible, all edges should start by writing to this edge
-        assertEquals( mid, slice );
+        assertTrue(groups.hasNext());
 
-        slice = cache.getSlice( scope, id,  max-1, edgeType, otherIdType );
+        assertSame("min shard expected", minShardGroup, groups.next());
 
+        assertFalse(groups.hasNext());
 
-        //we return the mid UUID possible, all edges should start by writing to this edge
-        assertEquals( mid, slice );
 
 
-        slice = cache.getSlice( scope, id,   max, edgeType, otherIdType );
+        //max
 
+        writeShard = cache.getWriteShardGroup( scope, maxShard.getShardIndex(), directedEdgeMeta );
 
-        //we return the mid UUID possible, all edges should start by writing to this edge
-        assertEquals( max, slice );
+        assertSame(maxShardGroup, writeShard);
 
-        /**
-         * Verify that we fired the audit
-         */
-        verify( allocation ).auditMaxShard( scope, id, edgeType, otherIdType );
-    }
 
+        groups =  cache.getReadShardGroup( scope, maxShard.getShardIndex(), directedEdgeMeta );
 
-    @Test
-    public void testRangeShardIterator() {
+        assertTrue(groups.hasNext());
 
-        final GraphFig graphFig = getFigMock();
+        assertSame("max shard expected", maxShardGroup, groups.next());
 
-        final NodeShardAllocation allocation = mock( NodeShardAllocation.class );
+        assertTrue(groups.hasNext());
 
-        final Id id = createId( "test" );
+        assertSame("mid shard expected", midShardGroup, groups.next());
 
-        final String edgeType = "edge";
 
-        final String otherIdType = "type";
+        assertTrue(groups.hasNext());
 
+        assertSame("min shard expected", minShardGroup, groups.next());
 
-        /**
-         * Set our min mid and max
-         */
-        final long min = 1;
+        assertFalse(groups.hasNext());
 
 
-        final long mid = 100;
 
+        //now test at mid +1 to ensure we get mid + min
+        writeShard = cache.getWriteShardGroup( scope, midShard.getShardIndex() + 1, directedEdgeMeta );
 
-        final long max = 200;
+        assertSame(midShardGroup, writeShard);
 
 
-        NodeShardCache cache = new NodeShardCacheImpl( allocation, graphFig );
+        groups =  cache.getReadShardGroup( scope, midShard.getShardIndex() + 1, directedEdgeMeta );
 
+        assertTrue(groups.hasNext());
 
-        /**
-         * Simulate returning all shards
-         */
-        when( allocation.getShards( same( scope ), same( id ),  any(Optional.class),
-                same( edgeType ), same( otherIdType ) ) ).thenReturn( Arrays.asList( min, mid, max ).iterator() );
+        assertSame("mid shard expected", midShardGroup, groups.next());
 
+        assertTrue(groups.hasNext());
 
-        //check getting equal to our min, mid and max
+        assertSame("min shard expected", minShardGroup, groups.next());
 
-        Iterator<Long> slice =
-                cache.getVersions( scope, id,   max, edgeType, otherIdType );
+        assertFalse(groups.hasNext());
 
 
-        assertEquals( max, slice.next().longValue() );
-        assertEquals( mid, slice.next().longValue() );
-        assertEquals( min, slice.next().longValue() );
 
+        //now test at mid -1 to ensure we get min
+        writeShard = cache.getWriteShardGroup( scope, midShard.getShardIndex() - 1, directedEdgeMeta );
 
-        slice = cache.getVersions( scope, id,   mid,
-                edgeType, otherIdType );
+        assertSame(minShardGroup, writeShard);
 
-        assertEquals( mid, slice.next().longValue() );
-        assertEquals( min, slice.next().longValue() );
 
+        groups =  cache.getReadShardGroup( scope, midShard.getShardIndex() - 1, directedEdgeMeta );
 
-        slice = cache.getVersions( scope, id,   min,
-                edgeType, otherIdType );
 
-        assertEquals( min, slice.next().longValue() );
+        assertTrue(groups.hasNext());
 
+        assertSame("min shard expected", minShardGroup, groups.next());
+
+        assertFalse(groups.hasNext());
 
     }
 
 
+
+
+
     private GraphFig getFigMock() {
         final GraphFig graphFig = mock( GraphFig.class );
         when( graphFig.getShardCacheSize() ).thenReturn( 1000l );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9d652dd/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroupTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroupTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroupTest.java
new file mode 100644
index 0000000..b08da9a
--- /dev/null
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroupTest.java
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.graph.serialization.impl.shard;
+
+
+import java.util.Collection;
+import java.util.Set;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * Test for the group functionality
+ */
+public class ShardEntryGroupTest {
+
+    @Test
+    public void singleEntry() {
+
+        final long delta = 10000;
+
+        Shard rootShard = new Shard( 0, 0, false );
+
+        ShardEntryGroup shardEntryGroup = new ShardEntryGroup( delta );
+
+        final boolean result = shardEntryGroup.addShard( rootShard );
+
+        assertTrue( "Shard added", result );
+
+        assertFalse( "Single shard cannot be deleted", shardEntryGroup.canBeDeleted( rootShard ) );
+
+        assertNull( "No merge target found", shardEntryGroup.getCompactionTarget() );
+
+        assertFalse( "Merge cannot be run with a single shard", shardEntryGroup.shouldCompact( Long.MAX_VALUE ) );
+    }
+
+
+    @Test
+    public void allocatedWithinDelta() {
+
+        final long delta = 10000;
+
+        Shard firstShard = new Shard( 1000, 1000, false );
+
+        Shard secondShard = new Shard( 1000, 1001, false );
+
+
+        ShardEntryGroup shardEntryGroup = new ShardEntryGroup( delta );
+
+        boolean result = shardEntryGroup.addShard( secondShard );
+
+        assertTrue( "Shard added", result );
+
+        result = shardEntryGroup.addShard( firstShard );
+
+        assertTrue( " Shard added", result );
+
+
+        assertFalse( "First shard cannot be deleted", shardEntryGroup.canBeDeleted( firstShard ) );
+
+        assertFalse( "Second shard cannot be deleted", shardEntryGroup.canBeDeleted( secondShard ) );
+
+        assertFalse( "Duplicate shard id cannot be deleted", shardEntryGroup.canBeDeleted( secondShard ) );
+
+        assertNull( "Can't compact, no min compacted shard present", shardEntryGroup.getCompactionTarget() );
+
+
+        //TODO, should this blow up in general?  We don't have a compacted shard at the lower bounds, which shouldn't be allowed
+
+    }
+
+
+    @Test
+    public void testShardTarget() {
+
+        final long delta = 10000;
+
+        Shard compactedShard = new Shard( 0, 0, true );
+
+        Shard firstShard = new Shard( 1000, 1000, false );
+
+        Shard secondShard = new Shard( 1000, 1001, false );
+
+
+        ShardEntryGroup shardEntryGroup = new ShardEntryGroup( delta );
+
+        boolean result = shardEntryGroup.addShard( secondShard );
+
+
+        assertTrue( "Shard added", result );
+
+        result = shardEntryGroup.addShard( firstShard );
+
+        assertTrue( "Shard added", result );
+
+        result = shardEntryGroup.addShard( compactedShard );
+
+        assertTrue( " Shard added", result );
+
+
+        assertFalse( "First shard cannot be deleted", shardEntryGroup.canBeDeleted( firstShard ) );
+
+        assertFalse( "Second shard cannot be deleted", shardEntryGroup.canBeDeleted( secondShard ) );
+
+        assertFalse( "Duplicate shard id cannot be deleted", shardEntryGroup.canBeDeleted( secondShard ) );
+
+        assertEquals( "Min compaction target found", firstShard, shardEntryGroup.getCompactionTarget() );
+
+        //shouldn't return true, since we haven't passed delta time in the second shard
+        assertFalse( "Merge cannot be run within min time",
+                shardEntryGroup.shouldCompact( firstShard.getCreatedTime() + delta ) );
+
+        //shouldn't return true, since we haven't passed delta time in the second shard
+        assertFalse( "Merge cannot be run within min time",
+                shardEntryGroup.shouldCompact( secondShard.getCreatedTime() + delta ) );
+
+        //we haven't passed the delta in the neighbor that would be our source, shard2, we shouldn't return true
+        //we read from shard2 and write to shard1
+        assertFalse( "Merge cannot be run with after min time",
+                shardEntryGroup.shouldCompact( firstShard.getCreatedTime() + delta + 1 ) );
+
+        assertTrue( "Merge should be run with after min time",
+                shardEntryGroup.shouldCompact( secondShard.getCreatedTime() + delta + 1 ) );
+    }
+
+
+    @Test
+    public void multipleShardGroups() {
+
+        final long delta = 10000;
+
+        Shard firstShard = new Shard( 1000, 10000, false );
+
+        Shard secondShard = new Shard( 999, 9000, false );
+
+        Shard compactedShard1 = new Shard( 900, 8000, true );
+
+        Shard compactedShard2 = new Shard( 800, 7000, true );
+
+
+        ShardEntryGroup shardEntryGroup = new ShardEntryGroup( delta );
+
+        boolean result = shardEntryGroup.addShard( firstShard );
+
+        assertTrue( "Shard added", result );
+
+        result = shardEntryGroup.addShard( secondShard );
+
+        assertTrue( " Shard added", result );
+
+        result = shardEntryGroup.addShard( compactedShard1 );
+
+        assertTrue( "Shard added", result );
+
+        result = shardEntryGroup.addShard( compactedShard2 );
+
+        assertFalse( "Shouldn't add since it's compacted", result );
+
+        ShardEntryGroup secondGroup = new ShardEntryGroup( delta );
+
+        result = secondGroup.addShard( compactedShard2 );
+
+        assertTrue( "Added successfully", result );
+    }
+
+
+    @Test
+    public void boundShardGroup() {
+
+        final long delta = 10000;
+
+        Shard firstShard = new Shard( 1000, 10000, false );
+
+        Shard secondShard = new Shard( 999, 9000, false );
+
+        Shard compactedShard1 = new Shard( 900, 8000, true );
+
+
+        ShardEntryGroup shardEntryGroup = new ShardEntryGroup( delta );
+
+        boolean result = shardEntryGroup.addShard( firstShard );
+
+        assertTrue( "Shard added", result );
+
+        result = shardEntryGroup.addShard( secondShard );
+
+        assertTrue( " Shard added", result );
+
+        result = shardEntryGroup.addShard( compactedShard1 );
+
+        assertTrue( "Shard added", result );
+
+
+        assertTrue( "Shard can be deleted", shardEntryGroup.canBeDeleted( firstShard ) );
+
+        assertFalse( "Compaction shard shard cannot be deleted", shardEntryGroup.canBeDeleted( secondShard ) );
+
+        assertEquals( "Same shard for merge target", secondShard, shardEntryGroup.getCompactionTarget() );
+
+        //shouldn't return true, since we haven't passed delta time in the second shard
+        assertFalse( "Merge cannot be run within min time",
+                shardEntryGroup.shouldCompact( firstShard.getCreatedTime() + delta ) );
+
+        //shouldn't return true, since we haven't passed delta time in the second shard
+        assertFalse( "Merge cannot be run within min time",
+                shardEntryGroup.shouldCompact( secondShard.getCreatedTime() + delta ) );
+
+
+        assertFalse( "Merge cannot be run within min time",
+                shardEntryGroup.shouldCompact( secondShard.getCreatedTime() + delta + 1 ) );
+
+        assertTrue( "Merge should be run with after min time", shardEntryGroup.shouldCompact( firstShard.getCreatedTime() + delta + 1 ) );
+    }
+
+
+    /**
+     * Ensures that we read from all shards (even the compacted one)
+     */
+    @Test
+    public void getAllReadShards() {
+
+        final long delta = 10000;
+
+        Shard firstShard = new Shard( 1000, 10000, false );
+
+        Shard secondShard = new Shard( 999, 9000, false );
+
+        Shard compactedShard1 = new Shard( 900, 8000, true );
+
+
+        ShardEntryGroup shardEntryGroup = new ShardEntryGroup( delta );
+
+        boolean result = shardEntryGroup.addShard( firstShard );
+
+        assertTrue( "Shard added", result );
+
+        result = shardEntryGroup.addShard( secondShard );
+
+        assertTrue( " Shard added", result );
+
+        result = shardEntryGroup.addShard( compactedShard1 );
+
+        assertTrue( "Shard added", result );
+
+        Collection<Shard> readShards = shardEntryGroup.getReadShards();
+
+        assertEquals("Shard size correct", 3, readShards.size());
+
+        assertTrue("First shard present",  readShards.contains( firstShard ) );
+
+        assertTrue("Second shard present",  readShards.contains( firstShard ) );
+
+        assertTrue("Third shard present",  readShards.contains( firstShard ) );
+
+    }
+
+
+    /**
+     * Ensures that we read from all shards (even the compacted one)
+     */
+    @Test
+    public void getAllWriteShardsNotPastCompaction() {
+
+        final long delta = 10000;
+
+        Shard firstShard = new Shard( 1000, 10000, false );
+
+        Shard secondShard = new Shard( 999, 9000, false );
+
+        Shard compactedShard = new Shard( 900, 8000, true );
+
+
+        ShardEntryGroup shardEntryGroup = new ShardEntryGroup( delta );
+
+        boolean result = shardEntryGroup.addShard( firstShard );
+
+        assertTrue( "Shard added", result );
+
+        result = shardEntryGroup.addShard( secondShard );
+
+        assertTrue( " Shard added", result );
+
+        result = shardEntryGroup.addShard( compactedShard );
+
+        assertTrue( "Shard added", result );
+
+
+
+        Collection<Shard> writeShards = shardEntryGroup.getWriteShards( firstShard.getCreatedTime() + delta );
+
+        assertEquals("Shard size correct", 3, writeShards.size());
+
+        assertTrue("First shard present",  writeShards.contains( firstShard ) );
+
+        assertTrue("Second shard present",  writeShards.contains( secondShard ) );
+
+        assertTrue("Third shard present",  writeShards.contains( compactedShard ) );
+
+
+
+        writeShards = shardEntryGroup.getWriteShards(secondShard.getCreatedTime()+delta);
+
+        assertEquals("Shard size correct", 3, writeShards.size());
+
+        assertTrue("First shard present",  writeShards.contains( firstShard ) );
+
+        assertTrue("Second shard present",  writeShards.contains( secondShard ) );
+
+        assertTrue("Third shard present",  writeShards.contains( compactedShard ) );
+
+
+        /**
+         * Not the max created timestamp, shouldn't return less than all shards
+         */
+        writeShards = shardEntryGroup.getWriteShards(secondShard.getCreatedTime() +1 + delta);
+
+        assertEquals("Shard size correct", 3, writeShards.size());
+
+        assertTrue("First shard present",  writeShards.contains( firstShard ) );
+
+        assertTrue("Second shard present",  writeShards.contains( secondShard ) );
+
+        assertTrue("Third shard present",  writeShards.contains( compactedShard ) );
+
+
+
+        assertEquals("Compaction target correct", secondShard, shardEntryGroup.getCompactionTarget());
+
+        writeShards = shardEntryGroup.getWriteShards(firstShard.getCreatedTime() +1 + delta);
+
+        assertEquals("Shard size correct", 1, writeShards.size());
+
+
+        assertTrue("Second shard present",  writeShards.contains( secondShard ) );
+
+    }
+
+
+    @Test(expected=IllegalArgumentException.class)
+    public void failsInsertionOrder() {
+
+        final long delta = 10000;
+
+        Shard secondShard = new Shard(20000, 10000, false);
+
+        Shard firstShard = new Shard(10000 , 10000, false );
+
+        Shard rootShard = new Shard( 0, 0, false );
+
+        ShardEntryGroup shardEntryGroup = new ShardEntryGroup( delta );
+
+        boolean result = shardEntryGroup.addShard( secondShard );
+
+        assertTrue( "Shard added", result );
+
+        result = shardEntryGroup.addShard( rootShard );
+
+        assertTrue( "Shard added", result );
+
+        //this should blow up, we can't add a shard in the middle, it must always be greater than the current max
+
+        shardEntryGroup.addShard( firstShard );
+
+
+    }
+
+
+
+    @Test
+    public void shardEntryAddList() {
+
+        final long delta = 10000;
+
+        Shard highShard = new Shard( 30000, 1000, false );
+
+        Shard midShard = new Shard( 20000, 1000, true );
+
+        Shard lowShard = new Shard( 10000, 1000, false);
+
+        ShardEntryGroup shardEntryGroup = new ShardEntryGroup( delta );
+
+        boolean result = shardEntryGroup.addShard( highShard );
+
+        assertTrue( "Shard added", result );
+
+        result = shardEntryGroup.addShard( midShard );
+
+        assertTrue( "Shard added", result );
+
+        result = shardEntryGroup.addShard( lowShard );
+
+        assertFalse( "Shard added", result );
+    }
+
+
+
+
+}
+
+
+

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9d652dd/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java
index da19ce5..fd8ff26 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java
@@ -39,12 +39,17 @@ import org.junit.Test;
 import org.safehaus.guicyfig.Bypass;
 import org.safehaus.guicyfig.OptionState;
 import org.safehaus.guicyfig.Overrides;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
 import org.apache.usergrid.persistence.core.consistency.TimeService;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.GraphFig;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardApproximation;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeType;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 
@@ -68,6 +73,7 @@ import static org.mockito.Mockito.when;
 
 public class NodeShardApproximationTest {
 
+    private static final Logger LOG = LoggerFactory.getLogger( NodeShardApproximation.class );
 
     private GraphFig graphFig;
 
@@ -92,35 +98,38 @@ public class NodeShardApproximationTest {
 
         when( graphFig.getShardCacheSize() ).thenReturn( 10000l );
         when( graphFig.getShardSize() ).thenReturn( 250000l );
+        when( graphFig.getCounterFlushQueueSize() ).thenReturn( 10000 );
 
         nodeShardCounterSerialization = mock( NodeShardCounterSerialization.class );
 
-        when(nodeShardCounterSerialization.flush( any(Counter.class) )).thenReturn( mock( MutationBatch.class) );
-
+        when( nodeShardCounterSerialization.flush( any( Counter.class ) ) ).thenReturn( mock( MutationBatch.class ) );
 
 
         timeService = mock( TimeService.class );
 
-        when(timeService.getCurrentTime()).thenReturn( System.currentTimeMillis() );
+        when( timeService.getCurrentTime() ).thenReturn( System.currentTimeMillis() );
     }
 
 
     @Test
-    public void testSingleShard() {
-
+    public void testSingleShard() throws InterruptedException {
 
-        when(graphFig.getCounterFlushCount()).thenReturn( 100000l );
 
+        when( graphFig.getCounterFlushCount() ).thenReturn( 100000l );
         NodeShardApproximation approximation =
                 new NodeShardApproximationImpl( graphFig, nodeShardCounterSerialization, timeService );
 
 
         final Id id = createId( "test" );
-        final long shardId = 0l;
+        final Shard shard = new Shard(0, 0, true);
         final String type = "type";
         final String type2 = "subType";
 
-        long count = approximation.getCount( scope, id, shardId, type, type2 );
+        final DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromTargetNodeSourceType( id, type, type2 );
+
+        long count = approximation.getCount( scope, shard, directedEdgeMeta);
+
+        waitForFlush( approximation );
 
         assertEquals( 0, count );
     }
@@ -130,8 +139,6 @@ public class NodeShardApproximationTest {
     public void testSingleShardMultipleThreads() throws ExecutionException, InterruptedException {
 
 
-
-
         NodeShardCounterSerialization serialization = new TestNodeShardCounterSerialization();
 
         final NodeShardApproximation approximation =
@@ -144,8 +151,10 @@ public class NodeShardApproximationTest {
         final Id id = createId( "test" );
         final String type = "type";
         final String type2 = "subType";
-        final long shardId = 10000;
 
+        final Shard shard = new Shard(10000, 0, true);
+
+        final DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromTargetNodeSourceType( id, type, type2 );
 
         ExecutorService executor = Executors.newFixedThreadPool( workers );
 
@@ -158,7 +167,7 @@ public class NodeShardApproximationTest {
                 public Long call() throws Exception {
 
                     for ( int i = 0; i < increments; i++ ) {
-                        approximation.increment( scope, id, shardId, 1, type, type2 );
+                        approximation.increment( scope, shard, 1, directedEdgeMeta );
                     }
 
                     return 0l;
@@ -169,24 +178,25 @@ public class NodeShardApproximationTest {
         }
 
 
-
         for ( Future<Long> future : futures ) {
-           future.get();
+            future.get();
         }
 
-
+        waitForFlush( approximation );
         //get our count.  It should be accurate b/c we only have 1 instance
 
-        final long returnedCount = approximation.getCount( scope, id, shardId, type, type2);
+        final long returnedCount = approximation.getCount( scope, shard, directedEdgeMeta );
         final long expected = workers * increments;
 
 
-        assertEquals(expected, returnedCount);
-
+        assertEquals( expected, returnedCount );
 
+        //test we get nothing with the other type
 
+        final long emptyCount = approximation.getCount( scope, shard,  DirectedEdgeMeta.fromSourceNodeTargetType( id, type, type2 ));
 
 
+        assertEquals( 0, emptyCount );
     }
 
 
@@ -195,8 +205,6 @@ public class NodeShardApproximationTest {
     public void testMultipleShardMultipleThreads() throws ExecutionException, InterruptedException {
 
 
-
-
         NodeShardCounterSerialization serialization = new TestNodeShardCounterSerialization();
 
         final NodeShardApproximation approximation =
@@ -210,27 +218,32 @@ public class NodeShardApproximationTest {
         final String type = "type";
         final String type2 = "subType";
 
-        final AtomicLong shardIdCounter = new AtomicLong(  );
+        final AtomicLong shardIdCounter = new AtomicLong();
+
+
+        final DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromTargetNodeSourceType( id, type, type2 );
 
 
 
         ExecutorService executor = Executors.newFixedThreadPool( workers );
 
-        List<Future<Long>> futures = new ArrayList<>( workers );
+        List<Future<Shard>> futures = new ArrayList<>( workers );
 
         for ( int i = 0; i < workers; i++ ) {
 
-            final Future<Long> future = executor.submit( new Callable<Long>() {
+            final Future<Shard> future = executor.submit( new Callable<Shard>() {
                 @Override
-                public Long call() throws Exception {
+                public Shard call() throws Exception {
 
                     final long threadShardId = shardIdCounter.incrementAndGet();
 
+                    final Shard shard = new Shard( threadShardId, 0, true );
+
                     for ( int i = 0; i < increments; i++ ) {
-                        approximation.increment( scope, id, threadShardId, 1, type, type2 );
+                        approximation.increment( scope, shard, 1, directedEdgeMeta );
                     }
 
-                    return threadShardId;
+                    return shard;
                 }
             } );
 
@@ -238,29 +251,41 @@ public class NodeShardApproximationTest {
         }
 
 
+        for ( Future<Shard> future : futures ) {
+            final Shard shardId = future.get();
 
-        for ( Future<Long> future : futures ) {
-           final long shardId = future.get();
+            waitForFlush( approximation );
 
-            final long returnedCount = approximation.getCount( scope, id, shardId, type, type2);
+            final long returnedCount = approximation.getCount( scope, shardId, directedEdgeMeta);
 
-            assertEquals(increments, returnedCount);
+            assertEquals( increments, returnedCount );
         }
+    }
+
+
+    private void waitForFlush( NodeShardApproximation approximation ) throws InterruptedException {
 
+        approximation.beginFlush();
 
+        while ( approximation.flushPending() ) {
 
+            LOG.info("Waiting on beginFlush to complete");
 
+            Thread.sleep( 100 );
+        }
     }
 
 
+
     /**
      * These are created b/c we can't use Mockito.  It OOM's with keeping track of all the mock invocations
      */
 
-    private static class TestNodeShardCounterSerialization implements NodeShardCounterSerialization{
+    private static class TestNodeShardCounterSerialization implements NodeShardCounterSerialization {
 
         private Counter copy = new Counter();
 
+
         @Override
         public MutationBatch flush( final Counter counter ) {
             copy.merge( counter );
@@ -281,7 +306,6 @@ public class NodeShardApproximationTest {
     }
 
 
-
     /**
      * Simple test mutation to no-op during tests
      */
@@ -415,14 +439,14 @@ public class NodeShardApproximationTest {
     }
 
 
-
-    private static class TestGraphFig implements GraphFig{
+    private static class TestGraphFig implements GraphFig {
 
         @Override
         public int getScanPageSize() {
             return 0;  //To change body of implemented methods use File | Settings | File Templates.
         }
 
+
         @Override
         public int getRepairConcurrentSize() {
             return 0;  //To change body of implemented methods use File | Settings | File Templates.
@@ -442,6 +466,12 @@ public class NodeShardApproximationTest {
 
 
         @Override
+        public long getShardMinDelta() {
+            return 0;  //To change body of implemented methods use File | Settings | File Templates.
+        }
+
+
+        @Override
         public long getShardCacheSize() {
             return 0;  //To change body of implemented methods use File | Settings | File Templates.
         }
@@ -460,6 +490,12 @@ public class NodeShardApproximationTest {
 
 
         @Override
+        public int getCounterFlushQueueSize() {
+            return 10000;
+        }
+
+
+        @Override
         public void addPropertyChangeListener( final PropertyChangeListener propertyChangeListener ) {
             //To change body of implemented methods use File | Settings | File Templates.
         }
@@ -555,7 +591,8 @@ public class NodeShardApproximationTest {
         }
     }
 
-    private static class TestTimeService implements TimeService{
+
+    private static class TestTimeService implements TimeService {
 
         @Override
         public long getCurrentTime() {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9d652dd/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationTest.java
index 9968f67..9edc66a 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationTest.java
@@ -35,6 +35,9 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.GraphFig;
 import org.apache.usergrid.persistence.graph.guice.TestGraphModule;
 import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeType;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 
@@ -96,11 +99,11 @@ public class NodeShardCounterSerializationTest {
 
         final Id id = createId( "test" );
 
-        ShardKey key1 = new ShardKey( scope, id, 0, "type1" );
+        ShardKey key1 = new ShardKey( scope, new Shard(0, 0, false), DirectedEdgeMeta.fromSourceNode( id, "type1"  ) );
 
-        ShardKey key2 = new ShardKey( scope, id, 0, "type2" );
+        ShardKey key2 = new ShardKey( scope, new Shard(0, 0, false), DirectedEdgeMeta.fromSourceNode( id, "type2"  ) );
 
-        ShardKey key3 = new ShardKey( scope, id, 1, "type1" );
+        ShardKey key3 = new ShardKey( scope, new Shard(1, 0, false), DirectedEdgeMeta.fromSourceNode( id, "type1"  ) );
 
 
         Counter counter = new Counter();

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9d652dd/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIteratorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIteratorTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIteratorTest.java
new file mode 100644
index 0000000..5b20647
--- /dev/null
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIteratorTest.java
@@ -0,0 +1,232 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
+
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+
+import org.junit.Test;
+
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+
+public class ShardEntryGroupIteratorTest {
+
+    @Test(expected = IllegalArgumentException.class)
+    public void noShards(){
+        final long delta = 10000;
+        final Iterator<Shard> noShards = Collections.<Shard>emptyList().iterator();
+
+        //should blow up, our iterator is empty
+        new ShardEntryGroupIterator(noShards, delta);
+
+    }
+
+    @Test
+    public void existingSingleShard(){
+
+        final Shard minShard = new Shard(0, 0, true);
+        final long delta = 10000;
+        final Iterator<Shard> noShards = Collections.singleton( minShard ).iterator();
+
+        ShardEntryGroupIterator entryGroupIterator = new ShardEntryGroupIterator(noShards, delta);
+
+        assertTrue("Root shard always present", entryGroupIterator.hasNext());
+
+        ShardEntryGroup group = entryGroupIterator.next();
+
+        assertNotNull("Group returned", group);
+
+        Collection<Shard> readShards = group.getReadShards();
+
+        assertEquals("Min shard present", 1, readShards.size());
+
+        assertTrue("Min shard present", readShards.contains( minShard ));
+
+
+        Collection<Shard> writeShards = group.getWriteShards( 0 );
+
+        assertEquals("Min shard present", 1, writeShards.size());
+
+        assertTrue("Min shard present", writeShards.contains( minShard ));
+
+
+        writeShards = group.getWriteShards( Long.MAX_VALUE );
+
+        assertEquals("Min shard present", 1, writeShards.size());
+
+        assertTrue("Min shard present", writeShards.contains( minShard ));
+
+
+    }
+
+
+    /**
+     * Tests the iterator constructs boundaries between groups correctly.  In a "real" runtime environment, I expect
+     * that only the last 1 or 2 groups will actually have more than 1 entry.
+     */
+    @Test
+    public void boundedShardSets(){
+
+        /**
+         * Next shard group
+         */
+        final Shard shardGroup1Shard1 = new Shard(0, 0, true);
+
+        final Shard shardGroup1Shard2 = new Shard(10000, 100, false);
+
+        final Shard shardGroup1Shard3 = new Shard(20000, 200, false);
+
+
+        /**
+         * Middle shard group
+         */
+        final Shard shardGroup2Shard1 = new Shard(30000, 300, true);
+
+        final Shard shardGroup2Shard2 = new Shard(40000, 400, false);
+
+
+        /**
+         * Highest shard group
+         */
+
+        final Shard shardGroup3Shard1 = new Shard(50000, 500, true);
+
+        final Shard shardGroup3Shard2 = new Shard(60000, 600, false);
+
+        final Shard shardGroup3Shard3 = new Shard(70000, 700, false);
+
+
+
+
+        final long delta = 10000;
+        final Iterator<Shard> noShards = Arrays.asList(shardGroup3Shard3, shardGroup3Shard2, shardGroup3Shard1, shardGroup2Shard2, shardGroup2Shard1, shardGroup1Shard3, shardGroup1Shard2, shardGroup1Shard1 ).iterator();
+
+
+
+
+        ShardEntryGroupIterator entryGroupIterator = new ShardEntryGroupIterator(noShards, delta);
+
+        assertTrue("max group present", entryGroupIterator.hasNext());
+
+        ShardEntryGroup group = entryGroupIterator.next();
+
+        assertNotNull("Group returned", group);
+
+        Collection<Shard> readShards = group.getReadShards();
+
+        assertEquals("Min shard present", 3, readShards.size());
+
+        assertTrue("shardGroup3Shard3 shard present", readShards.contains( shardGroup3Shard3 ));
+
+        assertTrue("shardGroup3Shard2 shard present", readShards.contains( shardGroup3Shard2 ));
+
+        assertTrue("shardGroup3Shard1 shard present", readShards.contains( shardGroup3Shard1 ));
+
+
+        Collection<Shard> writeShards = group.getWriteShards( 0 );
+
+        assertEquals("Min shard present", 3, writeShards.size());
+
+        assertTrue("shardGroup3Shard3 shard present", writeShards.contains( shardGroup3Shard3 ));
+
+        assertTrue("shardGroup3Shard2 shard present", writeShards.contains( shardGroup3Shard2 ));
+
+        assertTrue("shardGroup3Shard1 shard present", writeShards.contains( shardGroup3Shard1 ));
+
+
+
+
+
+        assertTrue("middle group present", entryGroupIterator.hasNext());
+
+        group = entryGroupIterator.next();
+
+        assertNotNull("Group returned", group);
+
+       readShards = group.getReadShards();
+
+        assertEquals("Min shard present", 2, readShards.size());
+
+        assertTrue("shardGroup2Shard1 shard present", readShards.contains( shardGroup2Shard1 ));
+
+        assertTrue("shardGroup2Shard2 shard present", readShards.contains( shardGroup2Shard2 ));
+
+
+
+        writeShards = group.getWriteShards( 0 );
+
+        assertEquals("Min shard present", 2, writeShards.size());
+
+        assertTrue("shardGroup2Shard1 shard present", writeShards.contains( shardGroup2Shard1 ));
+
+        assertTrue("shardGroup2Shard2 shard present", writeShards.contains( shardGroup2Shard2 ));
+
+
+
+
+
+
+        assertTrue("min group present", entryGroupIterator.hasNext());
+
+        group = entryGroupIterator.next();
+
+        assertNotNull("Group returned", group);
+
+        readShards = group.getReadShards();
+
+        assertEquals("Min shard present", 3, readShards.size());
+
+        assertTrue("shardGroup1Shard3 shard present", readShards.contains( shardGroup1Shard3 ));
+
+        assertTrue("shardGroup1Shard2 shard present", readShards.contains( shardGroup1Shard2 ));
+
+        assertTrue("shardGroup1Shard1 shard present", readShards.contains( shardGroup1Shard1 ));
+
+
+
+        writeShards = group.getWriteShards( 0 );
+
+        assertEquals("Min shard present", 3, writeShards.size());
+
+        assertTrue("shardGroup1Shard3 shard present", writeShards.contains( shardGroup1Shard3 ));
+
+        assertTrue("shardGroup1Shard2 shard present", writeShards.contains( shardGroup1Shard2 ));
+
+        assertTrue("shardGroup1Shard1 shard present", writeShards.contains( shardGroup1Shard1 ));
+
+
+
+
+
+
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9d652dd/stack/corepersistence/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/pom.xml b/stack/corepersistence/pom.xml
index 4716533..9565543 100644
--- a/stack/corepersistence/pom.xml
+++ b/stack/corepersistence/pom.xml
@@ -55,7 +55,7 @@
         <junit.version>4.11</junit.version>
         <kryo-serializers.version>0.26</kryo-serializers.version>
         <log4j.version>1.2.17</log4j.version>
-        <rx.version>0.19.0</rx.version>
+        <rx.version>0.19.6</rx.version>
         <slf4j.version>1.7.2</slf4j.version>
         <surefire.version>2.16</surefire.version>