You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2014/08/05 00:15:02 UTC

[09/11] Checkpoint, still a WIP. Broken stuff.

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ecd603f/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
index 42b3dc2..8c60e2c 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
@@ -20,9 +20,9 @@ package org.apache.usergrid.persistence.graph.serialization.impl.shard;
 
 
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
-import java.util.List;
 
 import org.junit.Before;
 import org.junit.Test;
@@ -33,6 +33,7 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.GraphFig;
 import org.apache.usergrid.persistence.graph.MarkedEdge;
 import org.apache.usergrid.persistence.graph.SearchByIdType;
+import org.apache.usergrid.persistence.graph.exception.GraphRuntimeException;
 import org.apache.usergrid.persistence.graph.impl.SimpleMarkedEdge;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.NodeShardAllocationImpl;
 import org.apache.usergrid.persistence.model.entity.Id;
@@ -46,6 +47,7 @@ import static junit.framework.TestCase.assertTrue;
 import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createId;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.same;
@@ -77,21 +79,22 @@ public class NodeShardAllocationTest {
 
         when( graphFig.getShardCacheSize() ).thenReturn( 10000l );
         when( graphFig.getShardSize() ).thenReturn( 20000l );
-        when( graphFig.getShardCacheTimeout()).thenReturn( 30000l );
-    }
 
+        final long timeout = 30000;
+        when( graphFig.getShardCacheTimeout() ).thenReturn( timeout );
+        when( graphFig.getShardMinDelta() ).thenReturn( timeout * 2 );
+    }
 
 
     @Test
     public void minTime() {
         final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
 
-        final EdgeColumnFamilies edgeColumnFamilies = mock(EdgeColumnFamilies.class);
+        final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
 
-        final ShardedEdgeSerialization shardedEdgeSerialization = mock(ShardedEdgeSerialization.class);
+        final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
 
-        final NodeShardApproximation nodeShardCounterSerialization =
-                mock( NodeShardApproximation.class );
+        final NodeShardApproximation nodeShardCounterSerialization = mock( NodeShardApproximation.class );
 
 
         final TimeService timeService = mock( TimeService.class );
@@ -100,8 +103,8 @@ public class NodeShardAllocationTest {
 
 
         NodeShardAllocation approximation =
-                new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies,  shardedEdgeSerialization, nodeShardCounterSerialization, timeService,
-                        graphFig, keyspace );
+                new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
+                        nodeShardCounterSerialization, timeService, graphFig, keyspace );
 
 
         final long timeservicetime = System.currentTimeMillis();
@@ -112,7 +115,7 @@ public class NodeShardAllocationTest {
 
         final long returned = approximation.getMinTime();
 
-        assertEquals("Correct time was returned", expected, returned);
+        assertEquals( "Correct time was returned", expected, returned );
     }
 
 
@@ -120,13 +123,12 @@ public class NodeShardAllocationTest {
     public void noShards() {
         final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
 
-        final EdgeColumnFamilies edgeColumnFamilies = mock(EdgeColumnFamilies.class);
+        final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
 
-        final ShardedEdgeSerialization shardedEdgeSerialization = mock(ShardedEdgeSerialization.class);
+        final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
 
 
-        final NodeShardApproximation nodeShardCounterSerialization =
-                mock( NodeShardApproximation.class );
+        final NodeShardApproximation nodeShardCounterSerialization = mock( NodeShardApproximation.class );
 
 
         final TimeService timeService = mock( TimeService.class );
@@ -138,8 +140,8 @@ public class NodeShardAllocationTest {
         when( keyspace.prepareMutationBatch() ).thenReturn( batch );
 
         NodeShardAllocation approximation =
-                new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization, nodeShardCounterSerialization, timeService,
-                        graphFig, keyspace );
+                new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
+                        nodeShardCounterSerialization, timeService, graphFig, keyspace );
 
         final Id nodeId = createId( "test" );
         final String type = "type";
@@ -149,8 +151,8 @@ public class NodeShardAllocationTest {
          * Mock up returning an empty iterator, our audit shouldn't create a new shard
          */
         when( edgeShardSerialization
-                .getEdgeMetaData( same( scope ), same( nodeId ), eq( NodeType.SOURCE ), any( Optional.class ),  same( type ),
-                        same( subType ) ) ).thenReturn( Collections.<Shard>emptyList().iterator() );
+                .getEdgeMetaData( same( scope ), same( nodeId ), eq( NodeType.SOURCE ), any( Optional.class ),
+                        same( type ), same( subType ) ) ).thenReturn( Collections.<Shard>emptyList().iterator() );
 
         final boolean result = approximation.auditMaxShard( scope, nodeId, NodeType.SOURCE, type, subType );
 
@@ -158,19 +160,15 @@ public class NodeShardAllocationTest {
     }
 
 
-
-
-
     @Test
     public void existingFutureShardSameTime() {
         final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
 
-              final EdgeColumnFamilies edgeColumnFamilies = mock(EdgeColumnFamilies.class);
+        final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
 
-              final ShardedEdgeSerialization shardedEdgeSerialization = mock(ShardedEdgeSerialization.class);
+        final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
 
-        final NodeShardApproximation nodeShardCounterSerialization =
-                mock( NodeShardApproximation.class );
+        final NodeShardApproximation nodeShardCounterSerialization = mock( NodeShardApproximation.class );
 
 
         final TimeService timeService = mock( TimeService.class );
@@ -184,8 +182,8 @@ public class NodeShardAllocationTest {
 
 
         NodeShardAllocation approximation =
-                new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization, nodeShardCounterSerialization, timeService,
-                        graphFig, keyspace );
+                new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
+                        nodeShardCounterSerialization, timeService, graphFig, keyspace );
 
         final Id nodeId = createId( "test" );
         final String type = "type";
@@ -196,16 +194,16 @@ public class NodeShardAllocationTest {
 
         when( timeService.getCurrentTime() ).thenReturn( timeservicetime );
 
-        final Shard futureShard =  new Shard(10000l, timeservicetime) ;
+        final Shard futureShard = new Shard( 10000l, timeservicetime );
 
         /**
          * Mock up returning a min shard, and a future shard
          */
         when( edgeShardSerialization
-                .getEdgeMetaData( same( scope ), same( nodeId ), eq( NodeType.TARGET), any( Optional.class ),  same( type ),
-                        same( subType ) ) ).thenReturn( Arrays.asList( futureShard ).iterator() );
+                .getEdgeMetaData( same( scope ), same( nodeId ), eq( NodeType.TARGET ), any( Optional.class ),
+                        same( type ), same( subType ) ) ).thenReturn( Arrays.asList( futureShard ).iterator() );
 
-        final boolean result = approximation.auditMaxShard( scope, nodeId, NodeType.TARGET,  type, subType );
+        final boolean result = approximation.auditMaxShard( scope, nodeId, NodeType.TARGET, type, subType );
 
         assertFalse( "No shard allocated", result );
     }
@@ -215,12 +213,11 @@ public class NodeShardAllocationTest {
     public void lowCountFutureShard() {
         final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
 
-              final EdgeColumnFamilies edgeColumnFamilies = mock(EdgeColumnFamilies.class);
+        final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
 
-              final ShardedEdgeSerialization shardedEdgeSerialization = mock(ShardedEdgeSerialization.class);
+        final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
 
-        final NodeShardApproximation nodeShardApproximation =
-                mock( NodeShardApproximation.class );
+        final NodeShardApproximation nodeShardApproximation = mock( NodeShardApproximation.class );
 
 
         final TimeService timeService = mock( TimeService.class );
@@ -233,8 +230,8 @@ public class NodeShardAllocationTest {
 
 
         NodeShardAllocation approximation =
-                new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization, nodeShardApproximation, timeService,
-                        graphFig, keyspace );
+                new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
+                        nodeShardApproximation, timeService, graphFig, keyspace );
 
         final Id nodeId = createId( "test" );
         final String type = "type";
@@ -250,15 +247,16 @@ public class NodeShardAllocationTest {
          * Mock up returning a min shard, and a future shard
          */
         when( edgeShardSerialization
-                .getEdgeMetaData( same( scope ), same( nodeId ), eq(NodeType.TARGET), any( Optional.class ),  same( type ),
-                        same( subType ) ) ).thenReturn( Arrays.asList( new Shard(0l, 0l) ).iterator() );
+                .getEdgeMetaData( same( scope ), same( nodeId ), eq( NodeType.TARGET ), any( Optional.class ),
+                        same( type ), same( subType ) ) ).thenReturn( Arrays.asList( new Shard( 0l, 0l ) ).iterator() );
 
 
         //return a shard size < our max by 1
 
         final long count = graphFig.getShardSize() - 1;
 
-        when( nodeShardApproximation.getCount(scope, nodeId, NodeType.TARGET, 0l, type, subType )).thenReturn( count );
+        when( nodeShardApproximation.getCount( scope, nodeId, NodeType.TARGET, 0l, type, subType ) )
+                .thenReturn( count );
 
         final boolean result = approximation.auditMaxShard( scope, nodeId, NodeType.TARGET, type, subType );
 
@@ -270,26 +268,25 @@ public class NodeShardAllocationTest {
     public void equalCountFutureShard() {
         final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
 
-        final EdgeColumnFamilies edgeColumnFamilies = mock(EdgeColumnFamilies.class);
+        final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
 
-        final ShardedEdgeSerialization shardedEdgeSerialization = mock(ShardedEdgeSerialization.class);
+        final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
 
-        final NodeShardApproximation nodeShardApproximation =
-                mock( NodeShardApproximation.class );
+        final NodeShardApproximation nodeShardApproximation = mock( NodeShardApproximation.class );
 
 
         final TimeService timeService = mock( TimeService.class );
 
         final Keyspace keyspace = mock( Keyspace.class );
 
-        final MutationBatch batch = mock(MutationBatch.class);
+        final MutationBatch batch = mock( MutationBatch.class );
 
-        when(keyspace.prepareMutationBatch()).thenReturn( batch );
+        when( keyspace.prepareMutationBatch() ).thenReturn( batch );
 
 
         NodeShardAllocation approximation =
-                new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization, nodeShardApproximation, timeService,
-                        graphFig, keyspace );
+                new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
+                        nodeShardApproximation, timeService, graphFig, keyspace );
 
         final Id nodeId = createId( "test" );
         final String type = "type";
@@ -305,39 +302,38 @@ public class NodeShardAllocationTest {
          * Mock up returning a min shard
          */
         when( edgeShardSerialization
-                .getEdgeMetaData( same( scope ), same( nodeId ), eq(NodeType.SOURCE), any( Optional.class ),  same( type ),
-                        same( subType ) ) ).thenReturn( Arrays.asList( new Shard( 0l, 0l ) ).iterator() );
+                .getEdgeMetaData( same( scope ), same( nodeId ), eq( NodeType.SOURCE ), any( Optional.class ),
+                        same( type ), same( subType ) ) ).thenReturn( Arrays.asList( new Shard( 0l, 0l ) ).iterator() );
 
 
         final long shardCount = graphFig.getShardSize();
 
         //return a shard size equal to our max
-        when( nodeShardApproximation
-                .getCount(   scope , nodeId, NodeType.SOURCE, 0l,type , subType  ))
+        when( nodeShardApproximation.getCount( scope, nodeId, NodeType.SOURCE, 0l, type, subType ) )
                 .thenReturn( shardCount );
 
         ArgumentCaptor<Long> shardValue = ArgumentCaptor.forClass( Long.class );
         ArgumentCaptor<Long> timestampValue = ArgumentCaptor.forClass( Long.class );
 
 
-
-
         //mock up our mutation
         when( edgeShardSerialization
-                .writeEdgeMeta( same( scope ), same( nodeId ), eq(NodeType.SOURCE), shardValue.capture(), timestampValue.capture(), same( type ), same( subType ) ) )
+                .writeEdgeMeta( same( scope ), same( nodeId ), eq( NodeType.SOURCE ), shardValue.capture(),
+                        timestampValue.capture(), same( type ), same( subType ) ) )
                 .thenReturn( mock( MutationBatch.class ) );
 
 
-        final SimpleMarkedEdge returnedEdge = new SimpleMarkedEdge( nodeId, type, createId("subType"), 10005l, false );
-        final Iterator<MarkedEdge> edgeIterator = Collections.singleton( (MarkedEdge)returnedEdge ).iterator();
+        final SimpleMarkedEdge returnedEdge =
+                new SimpleMarkedEdge( nodeId, type, createId( "subType" ), 10005l, false );
+        final Iterator<MarkedEdge> edgeIterator = Collections.singleton( ( MarkedEdge ) returnedEdge ).iterator();
 
         //mock up returning the value
-        when( shardedEdgeSerialization.getEdgesFromSourceByTargetType( same( edgeColumnFamilies ), same( scope ),
-                any( SearchByIdType.class ), any(Iterator.class) )).thenReturn( edgeIterator );
+        when( shardedEdgeSerialization
+                .getEdgesFromSourceByTargetType( same( edgeColumnFamilies ), same( scope ), any( SearchByIdType.class ),
+                        any( Iterator.class ) ) ).thenReturn( edgeIterator );
 
 
-
-        final boolean result = approximation.auditMaxShard( scope, nodeId, NodeType.SOURCE,  type, subType );
+        final boolean result = approximation.auditMaxShard( scope, nodeId, NodeType.SOURCE, type, subType );
 
         assertTrue( "Shard allocated", result );
 
@@ -350,41 +346,37 @@ public class NodeShardAllocationTest {
         assertEquals( "Expected time service time", timeservicetime, savedTimestamp );
 
 
-
         //now check our max value was set
 
         final long savedShardPivot = shardValue.getValue();
 
-        assertEquals("Expected max value to be the same", returnedEdge.getTimestamp(), savedShardPivot);
+        assertEquals( "Expected max value to be the same", returnedEdge.getTimestamp(), savedShardPivot );
     }
 
 
-
-
     @Test
     public void futureCountShardCleanup() {
         final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
 
-        final EdgeColumnFamilies edgeColumnFamilies = mock(EdgeColumnFamilies.class);
+        final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
 
-        final ShardedEdgeSerialization shardedEdgeSerialization = mock(ShardedEdgeSerialization.class);
+        final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
 
-        final NodeShardApproximation nodeShardApproximation =
-                mock( NodeShardApproximation.class );
+        final NodeShardApproximation nodeShardApproximation = mock( NodeShardApproximation.class );
 
 
         final TimeService timeService = mock( TimeService.class );
 
         final Keyspace keyspace = mock( Keyspace.class );
 
-        final MutationBatch batch = mock(MutationBatch.class);
+        final MutationBatch batch = mock( MutationBatch.class );
 
-        when(keyspace.prepareMutationBatch()).thenReturn( batch );
+        when( keyspace.prepareMutationBatch() ).thenReturn( batch );
 
 
         NodeShardAllocation approximation =
-                new NodeShardAllocationImpl(edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization, nodeShardApproximation, timeService,
-                        graphFig, keyspace );
+                new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
+                        nodeShardApproximation, timeService, graphFig, keyspace );
 
         final Id nodeId = createId( "test" );
         final String type = "type";
@@ -392,100 +384,132 @@ public class NodeShardAllocationTest {
 
 
         /**
-         * Use the time service to generate UUIDS
+         * Use the time service to generate timestamps
          */
-        final long timeservicetime = System.currentTimeMillis();
+        final long timeservicetime = 10000;
 
 
         when( timeService.getCurrentTime() ).thenReturn( timeservicetime );
 
-        assertTrue("Shard cache mocked", graphFig.getShardCacheTimeout() > 0);
+        assertTrue( "Shard cache mocked", graphFig.getShardCacheTimeout() > 0 );
 
 
         /**
          * Simulates clock drift when 2 nodes create future shards near one another
          */
-        final long futureTime = timeService.getCurrentTime()  + 2 * graphFig.getShardCacheTimeout();
+        final long minDelta = graphFig.getShardMinDelta();
+
 
+        final Shard minShard = new Shard( 0l, 0l );
 
-        final Shard minShard = new Shard(0l, 0l);
+        //a shard that isn't our minimum, but exists after compaction
+        final Shard compactedShard = new Shard( 5000, 1000 );
 
         /**
-         * Simulate slow node
+         * Simulate different node time allocation
          */
 
-        //our second shard is the "oldest", and hence should be returned in the iterator.  Future shard 1 and 3 should be removed
-        final Shard futureShard1 = new Shard(futureTime - 1, timeservicetime+1000);
+        final long minTime = 10000;
+        //our second shard is the "oldest", and hence should be returned in the iterator.  Future shard 1 and 3
+        // should be removed
+
+        //this should get dropped, It's allocated after future shard2 even though the time is less
+        final Shard futureShard1 = new Shard( 10000, minTime + minDelta );
 
-        final Shard futureShard2 = new Shard(futureTime + 10000, timeservicetime);
+        //should get kept.
+        final Shard futureShard2 = new Shard( 10005, minTime );
 
-        final Shard futureShard3 = new Shard(futureShard2.getShardIndex() + 10000, timeservicetime+2000);
+        //should be removed
+        final Shard futureShard3 = new Shard( 10010, minTime + minDelta / 2 );
 
         /**
          * Mock up returning a min shard
          */
         when( edgeShardSerialization
-                .getEdgeMetaData( same( scope ), same( nodeId ), eq(NodeType.TARGET), any( Optional.class ), same( type ),
-                        same( subType ) ) ).thenReturn(
-                Arrays.asList( futureShard3, futureShard2, futureShard1, minShard ).iterator() );
-
+                .getEdgeMetaData( same( scope ), same( nodeId ), eq( NodeType.TARGET ), any( Optional.class ),
+                        same( type ), same( subType ) ) ).thenReturn(
+                Arrays.asList( futureShard3, futureShard2, futureShard1, compactedShard, minShard ).iterator() );
 
 
         ArgumentCaptor<Long> newLongValue = ArgumentCaptor.forClass( Long.class );
 
 
-
-
         //mock up our mutation
         when( edgeShardSerialization
-                .removeEdgeMeta( same( scope ), same( nodeId ), eq(NodeType.TARGET), newLongValue.capture(), same( type ), same( subType ) ) )
-                .thenReturn( mock( MutationBatch.class ) );
+                .removeEdgeMeta( same( scope ), same( nodeId ), eq( NodeType.TARGET ), newLongValue.capture(),
+                        same( type ), same( subType ) ) ).thenReturn( mock( MutationBatch.class ) );
 
 
-        final Iterator<Shard>
-                result = approximation.getShards( scope, nodeId, NodeType.TARGET, Optional.<Shard>absent(), type, subType );
+        final Iterator<ShardEntryGroup> result =
+                approximation.getShards( scope, nodeId, NodeType.TARGET, Optional.<Shard>absent(), type, subType );
 
 
         assertTrue( "Shards present", result.hasNext() );
 
-        assertEquals("Only single next shard returned", futureShard2,  result.next());
 
-        assertTrue("Shards present", result.hasNext());
+        ShardEntryGroup shardEntryGroup = result.next();
 
-        assertEquals("Previous shard present", 0l, result.next().getShardIndex());
+        assertEquals( "Future shard returned", futureShard1, shardEntryGroup.getMergeTarget() );
 
-        assertFalse("No shards left", result.hasNext());
 
-        /**
-         * Now we need to verify that both our mutations have been added
-         */
+        //now verify all 4 are in this group.  This is because the first shard (0,0) (n-1_ may be the only shard other
+        //nodes see while we're rolling our state.  This means it should be read and merged from as well
 
-        List<Long> values = newLongValue.getAllValues();
+        Collection<Shard> writeShards = shardEntryGroup.getWriteShards();
 
-        assertEquals("2 values removed", 2,  values.size());
+        assertEquals( "Shard size as expected", 4, writeShards.size() );
 
-        assertEquals("Deleted Max Future", futureShard1.getShardIndex(), values.get( 0 ).longValue());
-        assertEquals("Deleted Next Future", futureShard3.getShardIndex(), values.get( 1 ).longValue());
+        assertTrue( writeShards.contains( futureShard1 ) );
+        assertTrue( writeShards.contains( futureShard2 ) );
+        assertTrue( writeShards.contains( futureShard3 ) );
+        assertTrue( writeShards.contains( compactedShard ) );
 
-    }
+
+        Collection<Shard> readShards = shardEntryGroup.getReadShards( minTime + minDelta );
+
+        assertEquals( "Shard size as expected", 4, readShards.size() );
+
+        assertTrue( readShards.contains( futureShard1 ) );
+        assertTrue( readShards.contains( futureShard2 ) );
+        assertTrue( readShards.contains( futureShard3 ) );
+        assertTrue( readShards.contains( compactedShard ) );
+
+
+        assertTrue( "Shards present", result.hasNext() );
+
+        shardEntryGroup = result.next();
+
+        writeShards = shardEntryGroup.getWriteShards();
+
+
+        assertTrue( "Previous shard present", writeShards.contains( minShard ) );
+
+
+        writeShards = shardEntryGroup.getReadShards( minTime + minDelta );
 
 
+        assertTrue( "Previous shard present", writeShards.contains( minShard ) );
+
+
+        assertFalse( "No shards left", result.hasNext() );
+    }
 
 
     @Test
     public void noShardsReturns() {
         final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
 
-              final EdgeColumnFamilies edgeColumnFamilies = mock(EdgeColumnFamilies.class);
+        final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
 
-              final ShardedEdgeSerialization shardedEdgeSerialization = mock(ShardedEdgeSerialization.class);
+        final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
 
-        final NodeShardApproximation nodeShardApproximation =
-                mock( NodeShardApproximation.class );
+        final NodeShardApproximation nodeShardApproximation = mock( NodeShardApproximation.class );
 
 
         final TimeService timeService = mock( TimeService.class );
 
+        when( timeService.getCurrentTime() ).thenReturn( 10000l );
+
         final Keyspace keyspace = mock( Keyspace.class );
 
         final MutationBatch batch = mock( MutationBatch.class );
@@ -493,8 +517,8 @@ public class NodeShardAllocationTest {
         when( keyspace.prepareMutationBatch() ).thenReturn( batch );
 
         NodeShardAllocation approximation =
-                new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization, nodeShardApproximation, timeService,
-                        graphFig, keyspace );
+                new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
+                        nodeShardApproximation, timeService, graphFig, keyspace );
 
         final Id nodeId = createId( "test" );
         final String type = "type";
@@ -504,15 +528,114 @@ public class NodeShardAllocationTest {
          * Mock up returning an empty iterator, our audit shouldn't create a new shard
          */
         when( edgeShardSerialization
-                .getEdgeMetaData( same( scope ), same( nodeId ), eq(NodeType.TARGET), any( Optional.class ),  same( type ),
-                        same( subType ) ) ).thenReturn( Collections.<Shard>emptyList().iterator() );
+                .getEdgeMetaData( same( scope ), same( nodeId ), eq( NodeType.TARGET ), any( Optional.class ),
+                        same( type ), same( subType ) ) ).thenReturn( Collections.<Shard>emptyList().iterator() );
+
+        final Iterator<ShardEntryGroup> result =
+                approximation.getShards( scope, nodeId, NodeType.TARGET, Optional.<Shard>absent(), type, subType );
+
+
+        ShardEntryGroup shardEntryGroup = result.next();
+
+        final Shard expected = new Shard( 0, 0 );
+
+        assertEquals( "Future shard returned", expected, shardEntryGroup.getMergeTarget() );
+
+
+        //now verify all 4 are in this group.  This is because the first shard (0,0) (n-1_ may be the only shard other
+        //nodes see while we're rolling our state.  This means it should be read and merged from as well
+
+        Collection<Shard> writeShards = shardEntryGroup.getWriteShards();
 
-        final Iterator<Shard> result = approximation.getShards( scope, nodeId, NodeType.TARGET,  Optional.<Shard>absent(), type,
-                subType );
+        Collection<Shard> readShards = shardEntryGroup.getReadShards( 10000l );
+
+
+        assertTrue( "0 shard allocated", writeShards.contains( expected ) );
+
+        assertTrue( "0 shard allocated", readShards.contains( expected ) );
 
-        assertEquals("0 shard allocated", 0l, result.next().getShardIndex());
 
         assertFalse( "No shard allocated", result.hasNext() );
     }
 
+
+    @Test
+    public void invalidConfiguration() {
+
+        final GraphFig graphFig = mock( GraphFig.class );
+
+        final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
+
+        final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
+
+        final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
+
+        final NodeShardApproximation nodeShardApproximation = mock( NodeShardApproximation.class );
+
+
+        /**
+         * Return 100000 milliseconds
+         */
+        final TimeService timeService = mock( TimeService.class );
+
+        final long time = 100000l;
+
+        when( timeService.getCurrentTime() ).thenReturn( time );
+
+
+        final long cacheTimeout = 30000l;
+
+        when( graphFig.getShardCacheTimeout() ).thenReturn( 30000l );
+
+
+        final long tooSmallDelta = ( long ) ( ( cacheTimeout * 2 ) * .99 );
+
+        when( graphFig.getShardMinDelta() ).thenReturn( tooSmallDelta );
+
+
+        final Keyspace keyspace = mock( Keyspace.class );
+
+        final MutationBatch batch = mock( MutationBatch.class );
+
+        when( keyspace.prepareMutationBatch() ).thenReturn( batch );
+
+
+        NodeShardAllocation approximation =
+                new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
+                        nodeShardApproximation, timeService, graphFig, keyspace );
+
+
+        /**
+         * Should throw an exception
+         */
+        try {
+            approximation.getMinTime();
+            fail( "Should have thrown a GraphRuntimeException" );
+        }
+        catch ( GraphRuntimeException gre ) {
+            //swallow
+        }
+
+        //now test something that passes.
+
+        final long minDelta = cacheTimeout * 2;
+
+        when( graphFig.getShardMinDelta() ).thenReturn( minDelta );
+
+        long returned = approximation.getMinTime();
+
+        long expectedReturned = time - minDelta;
+
+        assertEquals( expectedReturned, returned );
+
+        final long delta = cacheTimeout * 4;
+
+        when( graphFig.getShardMinDelta() ).thenReturn( delta );
+
+        returned = approximation.getMinTime();
+
+        expectedReturned = time - delta;
+
+        assertEquals( expectedReturned, returned );
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ecd603f/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroupTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroupTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroupTest.java
new file mode 100644
index 0000000..4e23d83
--- /dev/null
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroupTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.graph.serialization.impl.shard;
+
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * Test for the group functionality
+ */
+public class ShardEntryGroupTest {
+
+    @Test
+    public void singleEntry() {
+
+        final long delta = 10000;
+
+        Shard rootShard = new Shard( 0, 0 );
+
+        ShardEntryGroup shardEntryGroup = new ShardEntryGroup( delta );
+
+        final boolean result = shardEntryGroup.addShard( rootShard );
+
+        assertTrue( "Shard added", result );
+
+        assertFalse( "Single shard cannot be deleted", shardEntryGroup.canBeDeleted( rootShard ) );
+
+        assertSame( "Same shard for merge target", rootShard, shardEntryGroup.getMergeTarget() );
+
+        assertFalse( "Merge cannot be run with a single shard", shardEntryGroup.needsCompaction( 0 ) );
+    }
+
+
+    @Test
+    public void allocatedWithinDelta() {
+
+        final long delta = 10000;
+
+        Shard firstShard = new Shard( 1000, 1000 );
+
+        Shard secondShard = new Shard( 1000, 1000 );
+
+
+        ShardEntryGroup shardEntryGroup = new ShardEntryGroup( delta );
+
+        final boolean result = shardEntryGroup.addShard( rootShard );
+
+        assertTrue( "Shard added", result );
+
+        assertFalse( "Single shard cannot be deleted", shardEntryGroup.canBeDeleted( rootShard ) );
+
+        assertSame( "Same shard for merge target", rootShard, shardEntryGroup.getMergeTarget() );
+
+        assertFalse( "Merge cannot be run with a single shard", shardEntryGroup.needsCompaction( 0 ) );
+    }
+}
+
+
+

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ecd603f/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java
index 51448b7..2879d5b 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java
@@ -454,6 +454,12 @@ public class NodeShardApproximationTest {
 
 
         @Override
+        public long getShardMinDelta() {
+            return 0;  //To change body of implemented methods use File | Settings | File Templates.
+        }
+
+
+        @Override
         public long getShardCacheSize() {
             return 0;  //To change body of implemented methods use File | Settings | File Templates.
         }