You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/11/03 21:47:59 UTC

usergrid git commit: Adds delta to avoid accidentally deleting a shard as it's allocated

Repository: usergrid
Updated Branches:
  refs/heads/USERGRID-909 817d7ffb4 -> 06d34dbce


Adds delta to avoid accidentally deleting a shard as it's allocated


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/06d34dbc
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/06d34dbc
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/06d34dbc

Branch: refs/heads/USERGRID-909
Commit: 06d34dbce75b2868649aea3b44c651ecd3799d30
Parents: 817d7ff
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Nov 3 13:47:52 2015 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Nov 3 13:47:52 2015 -0700

----------------------------------------------------------------------
 .../impl/ScopedCacheSerializationImpl.java      | 29 ++++--------
 .../usergrid/persistence/graph/GraphFig.java    | 20 ++++++--
 .../persistence/graph/guice/GraphModule.java    |  2 +
 .../impl/shard/ShardEntryGroup.java             | 18 +++++--
 .../shard/impl/NodeShardAllocationImpl.java     | 49 +++++++++-----------
 .../shard/impl/ShardEntryGroupIterator.java     |  6 ++-
 .../graph/GraphManagerShardConsistencyIT.java   | 37 ++++++---------
 .../impl/shard/NodeShardAllocationTest.java     | 12 +++--
 .../impl/shard/NodeShardGroupSearchTest.java    | 14 +++---
 .../impl/shard/ShardEntryGroupTest.java         | 47 +++++++------------
 .../shard/impl/ShardEntryGroupIteratorTest.java |  6 +--
 stack/corepersistence/locks/pom.xml             |  8 ++--
 12 files changed, 121 insertions(+), 127 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/06d34dbc/stack/corepersistence/cache/src/main/java/org/apache/usergrid/persistence/cache/impl/ScopedCacheSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/cache/src/main/java/org/apache/usergrid/persistence/cache/impl/ScopedCacheSerializationImpl.java b/stack/corepersistence/cache/src/main/java/org/apache/usergrid/persistence/cache/impl/ScopedCacheSerializationImpl.java
index 1439bc5..bed84c6 100644
--- a/stack/corepersistence/cache/src/main/java/org/apache/usergrid/persistence/cache/impl/ScopedCacheSerializationImpl.java
+++ b/stack/corepersistence/cache/src/main/java/org/apache/usergrid/persistence/cache/impl/ScopedCacheSerializationImpl.java
@@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.PropertyAccessor;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.hash.Funnel;
 import com.google.common.hash.PrimitiveSink;
@@ -79,13 +80,15 @@ public class ScopedCacheSerializationImpl<K,V> implements ScopedCacheSerializati
     private static final int[] NUM_BUCKETS = {20};
 
     /** How to funnel keys for buckets */
-    private static final Funnel<String> MAP_KEY_FUNNEL = new Funnel<String>() {
+    private static final Funnel<String> MAP_KEY_FUNNEL =
+        ( Funnel<String> ) ( key, into ) -> into.putString(key, StringHashUtils.UTF8 );
+
+
+    /**
+     * One second gc grace since our columns expire
+     */
+    private static final int GC_GRACE = 1;
 
-        @Override
-        public void funnel( final String key, final PrimitiveSink into ) {
-            into.putString(key, StringHashUtils.UTF8);
-        }
-    };
 
     /**
      * Locator to get us all buckets
@@ -252,18 +255,6 @@ public class ScopedCacheSerializationImpl<K,V> implements ScopedCacheSerializati
     }
 
 
-    private class MutationBatchExec implements Callable<Void> {
-        private final MutationBatch myBatch;
-        private MutationBatchExec(MutationBatch batch) {
-            myBatch = batch;
-        }
-        @Override
-        public Void call() throws Exception {
-            myBatch.execute();
-            return null;
-        }
-    }
-
 
     private OperationResult<Void> executeBatch(MutationBatch batch) {
         try {
@@ -284,7 +275,7 @@ public class ScopedCacheSerializationImpl<K,V> implements ScopedCacheSerializati
                 BytesType.class.getSimpleName(),
                 BytesType.class.getSimpleName(),
                 BytesType.class.getSimpleName(),
-                MultiTennantColumnFamilyDefinition.CacheOption.KEYS );
+                MultiTennantColumnFamilyDefinition.CacheOption.KEYS, Optional.of(GC_GRACE) );
 
         return Arrays.asList(scopedCache);
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/06d34dbc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
index 82dfe51..b209835 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
@@ -43,7 +43,6 @@ public interface GraphFig extends GuicyFig {
      *
      * You will want to set this value to no more than 2x tombstone_failure_threshold to avoid failures on read during
      * shard compaction.
-     *
      */
     String SHARD_SIZE = "usergrid.graph.shard.size";
 
@@ -68,6 +67,14 @@ public interface GraphFig extends GuicyFig {
 
     String COUNTER_WRITE_FLUSH_QUEUE_SIZE = "usergrid.graph.shard.counter.queue.size";
 
+    /**
+     * The minimum amount of time than can occur (in millis) between shard allocation and deletion.
+     *
+     * Note that you should also pad this for node clock drift.  A good value for this would be 60 seconds, assuming you
+     * have NTP and your nodes are reasonably (< 1 second) synced
+     */
+    String SHARD_DELETE_DELTA = "usergrid.graph.shard.delete.delta";
+
 
     @Default( "1000" )
     @Key( SCAN_PAGE_SIZE )
@@ -80,9 +87,9 @@ public interface GraphFig extends GuicyFig {
 
 
     /**
-     * A 1% repair chance.  On average we'll check to repair on 1 out of every 100 reads
+     * A 2% repair chance.  On average we'll check to repair on 2 out of every 100 reads
      */
-    @Default( ".01" )
+    @Default( ".02" )
     @Key( SHARD_REPAIR_CHANCE )
     double getShardRepairChance();
 
@@ -104,6 +111,11 @@ public interface GraphFig extends GuicyFig {
     @Key( COUNTER_WRITE_FLUSH_QUEUE_SIZE )
     int getCounterFlushQueueSize();
 
+    @Default( "60000" )
+    @Key( SHARD_DELETE_DELTA )
+    long getShardDeleteDelta();
+
+
     @Default( "CL_EACH_QUORUM" )
     @Key( SHARD_WRITE_CONSISTENCY )
     String getShardWriteConsistency();
@@ -114,7 +126,5 @@ public interface GraphFig extends GuicyFig {
     @Default( "CL_LOCAL_QUORUM" )
     @Key( SHARD_READ_CONSISTENCY )
     String getShardReadConsistency();
-
-
 }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/06d34dbc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
index 752f417..067f33a 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
@@ -64,10 +64,12 @@ import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardGroup
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardGroupDeletion;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardedEdgeSerialization;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardedEdgeSerializationImpl;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.AsyncTaskExecutorImpl;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.EdgeShardSerializationImpl;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.NodeShardAllocationImpl;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.NodeShardGroupSearchImpl;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardGroupCompactionImpl;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardGroupDeletionImpl;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.SizebasedEdgeColumnFamilies;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.SizebasedEdgeShardStrategy;
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/06d34dbc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
index c33e0d8..1379209 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
@@ -43,6 +43,10 @@ public class ShardEntryGroup {
 
     private List<Shard> shards;
 
+    private final long delta;
+
+    private long maxCreatedTime;
+
     private Shard compactionTarget;
 
     private Shard rootShard;
@@ -51,7 +55,9 @@ public class ShardEntryGroup {
     /**
      * The max delta we accept in milliseconds for create time to be considered a member of this group
      */
-    public ShardEntryGroup() {
+    public ShardEntryGroup( final long delta ) {
+        Preconditions.checkArgument( delta > 0, "delta must be greater than 0" );
+        this.delta = delta;
         this.shards = new ArrayList<>();
     }
 
@@ -101,6 +107,8 @@ public class ShardEntryGroup {
     private void addShardInternal( final Shard shard ) {
         shards.add( shard );
 
+        maxCreatedTime = Math.max( maxCreatedTime, shard.getCreatedTime() );
+
         //we're changing our structure, unset the compaction target
         compactionTarget = null;
     }
@@ -293,6 +301,7 @@ public class ShardEntryGroup {
          * We don't have enough shards to compact, ignore
          */
         return getCompactionTarget() != null;
+
     }
 
 
@@ -346,8 +355,11 @@ public class ShardEntryGroup {
     @Override
     public String toString() {
         return "ShardEntryGroup{" +
-            "shards=" + shards +
-            ", compactionTarget=" + compactionTarget +
+            "compactionTarget=" + compactionTarget +
+            ", shards=" + shards +
+            ", delta=" + delta +
+            ", maxCreatedTime=" + maxCreatedTime +
+            ", rootShard=" + rootShard +
             '}';
     }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/06d34dbc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
index 8f9e27f..436fd74 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
@@ -74,8 +74,9 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
     @Inject
     public NodeShardAllocationImpl( final EdgeShardSerialization edgeShardSerialization,
                                     final EdgeColumnFamilies edgeColumnFamilies,
-                                    final ShardedEdgeSerialization shardedEdgeSerialization, final TimeService timeService,
-                                    final GraphFig graphFig, final ShardGroupCompaction shardGroupCompaction) {
+                                    final ShardedEdgeSerialization shardedEdgeSerialization,
+                                    final TimeService timeService, final GraphFig graphFig,
+                                    final ShardGroupCompaction shardGroupCompaction ) {
         this.edgeShardSerialization = edgeShardSerialization;
         this.edgeColumnFamilies = edgeColumnFamilies;
         this.shardedEdgeSerialization = shardedEdgeSerialization;
@@ -100,26 +101,26 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
             existingShards = edgeShardSerialization.getShardMetaDataLocal( scope, maxShardId, directedEdgeMeta );
         }
 
-            /**
-             * We didn't get anything out of cassandra, so we need to create the minumum shard
-             */
-            if ( existingShards == null || !existingShards.hasNext() ) {
-
+        /**
+         * We didn't get anything out of cassandra, so we need to create the minumum shard
+         */
+        if ( existingShards == null || !existingShards.hasNext() ) {
 
-                final MutationBatch batch = edgeShardSerialization.writeShardMeta( scope, Shard.MIN_SHARD, directedEdgeMeta );
-                try {
-                    batch.execute();
-                }
-                catch ( ConnectionException e ) {
-                    throw new RuntimeException( "Unable to connect to casandra", e );
-                }
 
-                existingShards = Collections.singleton( Shard.MIN_SHARD ).iterator();
+            final MutationBatch batch =
+                edgeShardSerialization.writeShardMeta( scope, Shard.MIN_SHARD, directedEdgeMeta );
+            try {
+                batch.execute();
             }
+            catch ( ConnectionException e ) {
+                throw new RuntimeException( "Unable to connect to casandra", e );
+            }
+
+            existingShards = Collections.singleton( Shard.MIN_SHARD ).iterator();
         }
 
 
-        return new ShardEntryGroupIterator( existingShards, shardGroupCompaction, scope, directedEdgeMeta );
+        return new ShardEntryGroupIterator( existingShards, graphFig.getShardDeleteDelta(), shardGroupCompaction, scope, directedEdgeMeta );
     }
 
 
@@ -171,7 +172,6 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
         }
 
 
-
         final long shardSize = graphFig.getShardSize();
 
 
@@ -203,8 +203,7 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
 
 
         if ( !edges.hasNext() ) {
-            logger.debug(
-                "Tried to allocate a new shard for group {}, but no max value could be found in that row",
+            logger.debug( "Tried to allocate a new shard for group {}, but no max value could be found in that row",
                 readShards );
             return false;
         }
@@ -225,7 +224,7 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
             //we hit a pivot shard, set it since it could be the last one we encounter
             if ( i % shardSize == 0 ) {
                 marked = edges.next();
-                logger.debug("Found an edge {} to split at index {}", marked, i);
+                logger.debug( "Found an edge {} to split at index {}", marked, i );
             }
             else {
                 edges.next();
@@ -233,8 +232,6 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
         }
 
 
-
-
         /**
          * Sanity check in case our counters become severely out of sync with our edge state in cassandra.
          */
@@ -323,13 +320,13 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
             return Collections.<ShardEntryGroup>emptyList().iterator();
         }
 
-        return new ShardEntryGroupIterator( shards, NO_OP_COMPACTION, scope, directedEdgeMeta );
+        return new ShardEntryGroupIterator( shards, graphFig.getShardDeleteDelta(), NO_OP_COMPACTION, scope, directedEdgeMeta );
     }
 
 
     /**
-     * Class that just ignores compaction events, since we're already evaluating the events.  A bit of a hack
-     * that shows we need some refactoring
+     * Class that just ignores compaction events, since we're already evaluating the events.  A bit of a hack that shows
+     * we need some refactoring
      */
     private final static class NoOpCompaction implements ShardGroupCompaction {
 
@@ -342,6 +339,4 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
             return Futures.immediateFuture( AuditResult.NOT_CHECKED );
         }
     }
-
-
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/06d34dbc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java
index cbe35b6..3a1c4d8 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java
@@ -26,6 +26,7 @@ public class ShardEntryGroupIterator implements Iterator<ShardEntryGroup> {
 
     private final ShardGroupCompaction shardGroupCompaction;
     private final PushbackIterator<Shard> sourceIterator;
+    private final long minDelta;
     private final ApplicationScope scope;
     private final DirectedEdgeMeta directedEdgeMeta;
 
@@ -39,7 +40,7 @@ public class ShardEntryGroupIterator implements Iterator<ShardEntryGroup> {
      * @param shardIterator The iterator of all shards.  Order is expected to be by the  shard index from Long.MAX to
      * Long.MIN
      */
-    public ShardEntryGroupIterator( final Iterator<Shard> shardIterator,
+    public ShardEntryGroupIterator( final Iterator<Shard> shardIterator, final long minDelta,
                                     final ShardGroupCompaction shardGroupCompaction, final ApplicationScope scope,
                                     final DirectedEdgeMeta directedEdgeMeta ) {
 
@@ -49,6 +50,7 @@ public class ShardEntryGroupIterator implements Iterator<ShardEntryGroup> {
         this.directedEdgeMeta = directedEdgeMeta;
         this.sourceIterator = new PushbackIterator( shardIterator );
         this.shardGroupCompaction = shardGroupCompaction;
+        this.minDelta = minDelta;
     }
 
 
@@ -95,7 +97,7 @@ public class ShardEntryGroupIterator implements Iterator<ShardEntryGroup> {
         while ( sourceIterator.hasNext() ) {
 
             if ( next == null ) {
-                next = new ShardEntryGroup( );
+                next = new ShardEntryGroup( minDelta);
             }
 
             final Shard shard = sourceIterator.next();

http://git-wip-us.apache.org/repos/asf/usergrid/blob/06d34dbc/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
index f16cc78..2ac67aa 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
@@ -54,8 +54,11 @@ import org.apache.usergrid.persistence.core.util.IdGenerator;
 import org.apache.usergrid.persistence.graph.guice.TestGraphModule;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardAllocation;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardGroupSearch;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.NodeShardAllocationImpl;
 import org.apache.usergrid.persistence.model.entity.Id;
 
 import com.codahale.metrics.Meter;
@@ -86,9 +89,6 @@ public class GraphManagerShardConsistencyIT {
 
     private static final Meter writeMeter = registry.meter( "writeThroughput" );
 
-    private static final Slf4jReporter reporter =
-        Slf4jReporter.forRegistry( registry ).outputTo( log ).convertRatesTo( TimeUnit.SECONDS )
-                     .convertDurationsTo( TimeUnit.MILLISECONDS ).build();
     private Slf4jReporter reporter;
 
 
@@ -97,10 +97,6 @@ public class GraphManagerShardConsistencyIT {
 
     protected Object originalShardSize;
 
-    protected Object originalShardTimeout;
-
-    protected Object originalShardDelta;
-
     protected ListeningExecutorService executor;
 
 
@@ -131,10 +127,14 @@ public class GraphManagerShardConsistencyIT {
 
     @After
     public void tearDown() {
-        reporter.stop();
-        reporter.report();
+        if(reporter != null) {
+            reporter.stop();
+            reporter.report();
+        }
 
-        executor.shutdownNow();
+        if(executor != null) {
+            executor.shutdownNow();
+        }
     }
 
 
@@ -373,8 +373,6 @@ public class GraphManagerShardConsistencyIT {
 
             Thread.sleep( 2000 );
         }
-
-        executor.shutdownNow();
     }
 
 
@@ -484,10 +482,6 @@ public class GraphManagerShardConsistencyIT {
         final AtomicLong writeCounter = new AtomicLong();
 
 
-        //min stop time the min delta + 1 cache cycle timeout
-        final long minExecutionTime = graphFig.getShardMinDelta() + graphFig.getShardCacheTimeout();
-
-
         log.info( "Writing {} edges per worker on {} workers in {} injectors", workerWriteLimit, numWorkersPerInjector,
             numInjectors );
 
@@ -498,10 +492,9 @@ public class GraphManagerShardConsistencyIT {
         for ( Injector injector : injectors ) {
             final GraphManagerFactory gmf = injector.getInstance( GraphManagerFactory.class );
 
-
             for ( int i = 0; i < numWorkersPerInjector; i++ ) {
                 Future<Boolean> future =
-                    executor.submit( new Worker( gmf, generator, workerWriteLimit, minExecutionTime, writeCounter ) );
+                    executor.submit( new Worker( gmf, generator, workerWriteLimit, writeCounter ) );
 
                 futures.add( future );
             }
@@ -515,7 +508,7 @@ public class GraphManagerShardConsistencyIT {
         }
 
         //now get all our shards
-        final NodeShardCache cache = getInstance( injectors, NodeShardCache.class );
+        final NodeShardGroupSearch nodeShardGroupSearch = getInstance( injectors, NodeShardGroupSearch.class );
 
         final DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromSourceNode( sourceId, edgeType );
 
@@ -531,7 +524,7 @@ public class GraphManagerShardConsistencyIT {
 
 
         final Iterator<ShardEntryGroup> existingShardGroups =
-            cache.getReadShardGroup( scope, Long.MAX_VALUE, directedEdgeMeta );
+            nodeShardGroupSearch.getReadShardGroup( scope, Long.MAX_VALUE, directedEdgeMeta );
         int shardCount = 0;
 
         while ( existingShardGroups.hasNext() ) {
@@ -621,7 +614,7 @@ public class GraphManagerShardConsistencyIT {
             shardCount = 0;
 
             //we have to get it from the cache, because this will trigger the compaction process
-            final Iterator<ShardEntryGroup> groups = cache.getReadShardGroup( scope, Long.MAX_VALUE, directedEdgeMeta );
+            final Iterator<ShardEntryGroup> groups = nodeShardGroupSearch.getReadShardGroup( scope, Long.MAX_VALUE, directedEdgeMeta );
 
             ShardEntryGroup group = null;
 
@@ -759,7 +752,7 @@ public class GraphManagerShardConsistencyIT {
         /**
          * Perform the search returning an observable edge
          */
-        Observable<Edge> doSearch( final GraphManager manager );
+        Observable<MarkedEdge> doSearch( final GraphManager manager );
     }
 }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/06d34dbc/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
index 45047f1..ee2fb19 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
@@ -82,6 +82,8 @@ public class NodeShardAllocationTest {
 
         when( graphFig.getShardSize() ).thenReturn( 20000l );
 
+        when(graphFig.getShardDeleteDelta()).thenReturn( 60000l );
+
     }
 
 
@@ -114,7 +116,7 @@ public class NodeShardAllocationTest {
         final Shard firstShard = new Shard( 0l, 0l, true );
         final Shard futureShard = new Shard( 10000l, timeservicetime, false );
 
-        final ShardEntryGroup shardEntryGroup = new ShardEntryGroup( );
+        final ShardEntryGroup shardEntryGroup = new ShardEntryGroup(10000l  );
         shardEntryGroup.addShard( futureShard );
         shardEntryGroup.addShard( firstShard );
 
@@ -160,7 +162,7 @@ public class NodeShardAllocationTest {
 
         final Shard futureShard = new Shard( 10000l, timeservicetime, true );
 
-        final ShardEntryGroup shardEntryGroup = new ShardEntryGroup( );
+        final ShardEntryGroup shardEntryGroup = new ShardEntryGroup(10000l );
         shardEntryGroup.addShard( futureShard );
 
 
@@ -209,7 +211,7 @@ public class NodeShardAllocationTest {
 
         final Shard firstShard = new Shard( 0l, 0l, true );
 
-        final ShardEntryGroup shardEntryGroup = new ShardEntryGroup(  );
+        final ShardEntryGroup shardEntryGroup = new ShardEntryGroup(10000l  );
         shardEntryGroup.addShard( firstShard );
 
         final DirectedEdgeMeta targetEdgeMeta = DirectedEdgeMeta.fromSourceNodeTargetType( nodeId, type, subType );
@@ -323,7 +325,7 @@ public class NodeShardAllocationTest {
 
         final Shard firstShard = new Shard( 0l, 0l, true );
 
-        final ShardEntryGroup shardEntryGroup = new ShardEntryGroup( );
+        final ShardEntryGroup shardEntryGroup = new ShardEntryGroup(10000l );
         shardEntryGroup.addShard( firstShard );
 
         final DirectedEdgeMeta targetEdgeMeta = DirectedEdgeMeta.fromSourceNodeTargetType( nodeId, type, subType );
@@ -413,7 +415,7 @@ public class NodeShardAllocationTest {
 
         final Shard firstShard = new Shard( 0l, 0l, true );
 
-        final ShardEntryGroup shardEntryGroup = new ShardEntryGroup(  );
+        final ShardEntryGroup shardEntryGroup = new ShardEntryGroup(10000l  );
         shardEntryGroup.addShard( firstShard );
 
         final DirectedEdgeMeta targetEdgeMeta = DirectedEdgeMeta.fromSourceNodeTargetType( nodeId, type, subType );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/06d34dbc/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardGroupSearchTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardGroupSearchTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardGroupSearchTest.java
index c9fe4a4..910dde0 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardGroupSearchTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardGroupSearchTest.java
@@ -94,7 +94,7 @@ public class NodeShardGroupSearchTest {
         final Optional max = Optional.absent();
 
 
-        final ShardEntryGroup group = new ShardEntryGroup();
+        final ShardEntryGroup group = new ShardEntryGroup(10000l);
         group.addShard( new Shard( 0, 0, true ) );
 
 
@@ -168,14 +168,14 @@ public class NodeShardGroupSearchTest {
         /**
          * Simulate returning all shards
          */
-        final ShardEntryGroup minShardGroup = new ShardEntryGroup();
+        final ShardEntryGroup minShardGroup = new ShardEntryGroup(10000l);
         minShardGroup.addShard( minShard );
 
-        final ShardEntryGroup midShardGroup = new ShardEntryGroup();
+        final ShardEntryGroup midShardGroup = new ShardEntryGroup(10000l);
         midShardGroup.addShard( midShard );
 
 
-        final ShardEntryGroup maxShardGroup = new ShardEntryGroup();
+        final ShardEntryGroup maxShardGroup = new ShardEntryGroup(10000l);
         maxShardGroup.addShard( maxShard );
 
 
@@ -211,7 +211,7 @@ public class NodeShardGroupSearchTest {
 
             //use "thenAnswer" so we always return the value, even if  it's invoked more than 1 time.
             .thenAnswer( answer -> new ShardEntryGroupIterator( Collections.singleton( minShard ).iterator(),
-                NoOpShardCompaction.INSTANCE, scope, directedEdgeMeta ) );
+                10000l, NoOpShardCompaction.INSTANCE, scope, directedEdgeMeta ) );
 
 
         /**
@@ -243,7 +243,7 @@ public class NodeShardGroupSearchTest {
 
             //use "thenAnswer" so we always return the value, even if  it's invoked more than 1 time.
             .thenAnswer( answer -> new ShardEntryGroupIterator( Arrays.asList( midShard, minShard ).iterator(),
-                NoOpShardCompaction.INSTANCE, scope, directedEdgeMeta ) );
+                10000l, NoOpShardCompaction.INSTANCE, scope, directedEdgeMeta ) );
 
 
         /**
@@ -276,7 +276,7 @@ public class NodeShardGroupSearchTest {
             //use "thenAnswer" so we always return the value, even if  it's invoked more than 1 time.
             .thenAnswer(
                 answer -> new ShardEntryGroupIterator( Arrays.asList( maxShard, midShard, minShard ).iterator(),
-                    NoOpShardCompaction.INSTANCE, scope, directedEdgeMeta ) );
+                    10000l, NoOpShardCompaction.INSTANCE, scope, directedEdgeMeta ) );
 
 
         //check getting equal to our min, mid and max

http://git-wip-us.apache.org/repos/asf/usergrid/blob/06d34dbc/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroupTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroupTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroupTest.java
index d609d96..d9f8a2b 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroupTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroupTest.java
@@ -40,7 +40,7 @@ public class ShardEntryGroupTest {
 
         Shard rootShard = new Shard( 0, 0, false );
 
-        ShardEntryGroup shardEntryGroup = new ShardEntryGroup( );
+        ShardEntryGroup shardEntryGroup = new ShardEntryGroup( 10000l );
 
         final boolean result = shardEntryGroup.addShard( rootShard );
 
@@ -62,7 +62,7 @@ public class ShardEntryGroupTest {
         Shard secondShard = new Shard( 1000, 1001, false );
 
 
-        ShardEntryGroup shardEntryGroup = new ShardEntryGroup(  );
+        ShardEntryGroup shardEntryGroup = new ShardEntryGroup( 10000l );
 
         boolean result = shardEntryGroup.addShard( secondShard );
 
@@ -73,7 +73,6 @@ public class ShardEntryGroupTest {
         assertTrue( "Shard added", result );
 
 
-
         assertFalse( "First shard cannot be deleted", shardEntryGroup.canBeDeleted( firstShard ) );
 
         assertFalse( "Second shard cannot be deleted", shardEntryGroup.canBeDeleted( secondShard ) );
@@ -81,9 +80,6 @@ public class ShardEntryGroupTest {
         assertFalse( "Duplicate shard id cannot be deleted", shardEntryGroup.canBeDeleted( secondShard ) );
 
         assertNull( "Can't compact, no min compacted shard present", shardEntryGroup.getCompactionTarget() );
-
-
-
     }
 
 
@@ -97,7 +93,7 @@ public class ShardEntryGroupTest {
         Shard secondShard = new Shard( 1000, 1001, false );
 
 
-        ShardEntryGroup shardEntryGroup = new ShardEntryGroup( );
+        ShardEntryGroup shardEntryGroup = new ShardEntryGroup( 10000l );
 
         boolean result = shardEntryGroup.addShard( secondShard );
 
@@ -123,11 +119,9 @@ public class ShardEntryGroupTest {
 
         //we should compact these
         assertTrue( "Merge should be run", shardEntryGroup.shouldCompact() );
-
     }
 
 
-
     @Test
     public void lowerTimestampHigherShard() {
 
@@ -139,7 +133,7 @@ public class ShardEntryGroupTest {
         Shard compactedShard = new Shard( 500, 200, true );
 
 
-        ShardEntryGroup shardEntryGroup = new ShardEntryGroup(  );
+        ShardEntryGroup shardEntryGroup = new ShardEntryGroup( 10000l );
 
         boolean result = shardEntryGroup.addShard( firstShard );
 
@@ -154,19 +148,15 @@ public class ShardEntryGroupTest {
         assertTrue( "Shard added", result );
 
 
+        assertFalse( "First shard cannot be deleted", shardEntryGroup.canBeDeleted( secondShard ) );
 
-        assertFalse( "First shard cannot be deleted", shardEntryGroup.canBeDeleted( secondShard  ) );
-
-        assertTrue( "Second shard can be deleted", shardEntryGroup.canBeDeleted( firstShard  ) );
-
-        assertEquals( "Can't compact, no min compacted shard present", secondShard, shardEntryGroup.getCompactionTarget() );
-
-
+        assertTrue( "Second shard can be deleted", shardEntryGroup.canBeDeleted( firstShard ) );
 
+        assertEquals( "Can't compact, no min compacted shard present", secondShard,
+            shardEntryGroup.getCompactionTarget() );
     }
 
 
-
     @Test
     public void multipleShardGroups() {
 
@@ -179,7 +169,7 @@ public class ShardEntryGroupTest {
         Shard compactedShard2 = new Shard( 800, 7000, true );
 
 
-        ShardEntryGroup shardEntryGroup = new ShardEntryGroup(  );
+        ShardEntryGroup shardEntryGroup = new ShardEntryGroup( 10000l );
 
         boolean result = shardEntryGroup.addShard( firstShard );
 
@@ -197,7 +187,7 @@ public class ShardEntryGroupTest {
 
         assertFalse( "Shouldn't add since it's compacted", result );
 
-        ShardEntryGroup secondGroup = new ShardEntryGroup(  );
+        ShardEntryGroup secondGroup = new ShardEntryGroup( 10000l );
 
         result = secondGroup.addShard( compactedShard2 );
 
@@ -214,7 +204,7 @@ public class ShardEntryGroupTest {
         Shard compactedShard1 = new Shard( 900, 8000, true );
 
 
-        ShardEntryGroup shardEntryGroup = new ShardEntryGroup( );
+        ShardEntryGroup shardEntryGroup = new ShardEntryGroup( 10000l );
 
         boolean result = shardEntryGroup.addShard( firstShard );
 
@@ -236,9 +226,7 @@ public class ShardEntryGroupTest {
         assertEquals( "Same shard for merge target", secondShard, shardEntryGroup.getCompactionTarget() );
 
         //Should return true, we can merge
-        assertTrue( "Merge cannot be run within min time",
-                shardEntryGroup.shouldCompact() );
-
+        assertTrue( "Merge cannot be run within min time", shardEntryGroup.shouldCompact() );
     }
 
 
@@ -255,7 +243,7 @@ public class ShardEntryGroupTest {
         Shard compactedShard1 = new Shard( 900, 8000, true );
 
 
-        ShardEntryGroup shardEntryGroup = new ShardEntryGroup(  );
+        ShardEntryGroup shardEntryGroup = new ShardEntryGroup( 10000l );
 
         boolean result = shardEntryGroup.addShard( firstShard );
 
@@ -292,7 +280,7 @@ public class ShardEntryGroupTest {
         Shard compactedShard = new Shard( 900, 8000, true );
 
 
-        ShardEntryGroup shardEntryGroup = new ShardEntryGroup( );
+        ShardEntryGroup shardEntryGroup = new ShardEntryGroup( 10000l );
 
         boolean result = shardEntryGroup.addShard( ignoredProposedShard );
 
@@ -307,7 +295,7 @@ public class ShardEntryGroupTest {
         assertTrue( "Shard added", result );
 
 
-        Collection<Shard> writeShards = shardEntryGroup.getWriteShards(newAllocatedCompactionTarget.getShardIndex() );
+        Collection<Shard> writeShards = shardEntryGroup.getWriteShards( newAllocatedCompactionTarget.getShardIndex() );
 
         assertEquals( "Shard size correct", 1, writeShards.size() );
 
@@ -319,7 +307,6 @@ public class ShardEntryGroupTest {
         assertEquals( "Shard size correct", 1, writeShards.size() );
 
         assertTrue( "Lowest new shard present", writeShards.contains( compactedShard ) );
-
     }
 
 
@@ -332,7 +319,7 @@ public class ShardEntryGroupTest {
 
         Shard rootShard = new Shard( 0, 0, false );
 
-        ShardEntryGroup shardEntryGroup = new ShardEntryGroup( );
+        ShardEntryGroup shardEntryGroup = new ShardEntryGroup( 10000l );
 
         boolean result = shardEntryGroup.addShard( secondShard );
 
@@ -357,7 +344,7 @@ public class ShardEntryGroupTest {
 
         Shard lowShard = new Shard( 10000, 1000, false );
 
-        ShardEntryGroup shardEntryGroup = new ShardEntryGroup(  );
+        ShardEntryGroup shardEntryGroup = new ShardEntryGroup( 10000l );
 
         boolean result = shardEntryGroup.addShard( highShard );
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/06d34dbc/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIteratorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIteratorTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIteratorTest.java
index 6a0e1e5..0ab60a3 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIteratorTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIteratorTest.java
@@ -59,7 +59,7 @@ public class ShardEntryGroupIteratorTest {
         final Iterator<Shard> noShards = Collections.<Shard>emptyList().iterator();
 
         //should blow up, our iterator is empty
-        new ShardEntryGroupIterator( noShards, shardGroupCompaction, scope, directedEdgeMeta );
+        new ShardEntryGroupIterator( noShards, 10000l,  shardGroupCompaction, scope, directedEdgeMeta );
     }
 
 
@@ -77,7 +77,7 @@ public class ShardEntryGroupIteratorTest {
         final Iterator<Shard> noShards = Collections.singleton( minShard ).iterator();
 
         ShardEntryGroupIterator entryGroupIterator =
-            new ShardEntryGroupIterator( noShards, shardGroupCompaction, scope, directedEdgeMeta );
+            new ShardEntryGroupIterator( noShards, 10000l, shardGroupCompaction, scope, directedEdgeMeta );
 
 
         assertTrue( "Root shard always present", entryGroupIterator.hasNext() );
@@ -154,7 +154,7 @@ public class ShardEntryGroupIteratorTest {
 
 
         ShardEntryGroupIterator entryGroupIterator =
-            new ShardEntryGroupIterator( noShards, shardGroupCompaction, scope, directedEdgeMeta );
+            new ShardEntryGroupIterator( noShards, 10000l, shardGroupCompaction, scope, directedEdgeMeta );
 
         assertTrue( "max group present", entryGroupIterator.hasNext() );
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/06d34dbc/stack/corepersistence/locks/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/locks/pom.xml b/stack/corepersistence/locks/pom.xml
index de89053..6f6f77a 100644
--- a/stack/corepersistence/locks/pom.xml
+++ b/stack/corepersistence/locks/pom.xml
@@ -20,10 +20,10 @@
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
-        <artifactId>persistence</artifactId>
-        <groupId>org.apache.usergrid</groupId>
-        <version>2.1.0-SNAPSHOT</version>
-    </parent>
+       <artifactId>persistence</artifactId>
+       <groupId>org.apache.usergrid</groupId>
+       <version>2.1.1-SNAPSHOT</version>
+     </parent>
     <modelVersion>4.0.0</modelVersion>
 
     <description>The module for handling all distributed locks</description>