You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2016/03/23 18:34:45 UTC

[17/20] usergrid git commit: Fix issue where new shards were not picked up after new shards are allocated.

Fix issue where new shards were not picked up after new shards are allocated.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/97719684
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/97719684
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/97719684

Branch: refs/heads/release-2.1.1
Commit: 9771968408658e8ce4c14db305bc10fc173d65d8
Parents: 85cd12d
Author: Michael Russo <mr...@apigee.com>
Authored: Tue Mar 22 18:16:44 2016 -0700
Committer: Michael Russo <mr...@apigee.com>
Committed: Tue Mar 22 18:16:44 2016 -0700

----------------------------------------------------------------------
 .../astyanax/MultiRowShardColumnIterator.java   |  2 +-
 .../impl/shard/NodeShardCache.java              |  7 +++-
 .../shard/impl/EdgeShardSerializationImpl.java  |  1 +
 .../shard/impl/NodeShardAllocationImpl.java     | 23 +++++-----
 .../impl/shard/impl/NodeShardCacheImpl.java     | 11 +++++
 .../shard/impl/ShardGroupCompactionImpl.java    | 23 +++++-----
 .../graph/GraphManagerShardConsistencyIT.java   | 28 +++++++++----
 .../impl/shard/NodeShardAllocationTest.java     | 44 ++++++++++++++++----
 .../impl/shard/ShardGroupCompactionTest.java    |  4 +-
 .../graph/src/test/resources/log4j.properties   |  4 +-
 10 files changed, 103 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/97719684/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowShardColumnIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowShardColumnIterator.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowShardColumnIterator.java
index b13d0f5..86e3b4d 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowShardColumnIterator.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowShardColumnIterator.java
@@ -43,7 +43,7 @@ import com.netflix.astyanax.util.RangeBuilder;
  */
 public class MultiRowShardColumnIterator<R, C, T> implements Iterator<T> {
 
-    private static final Logger logger = LoggerFactory.getLogger( MultiRowColumnIterator.class );
+    private static final Logger logger = LoggerFactory.getLogger( MultiRowShardColumnIterator.class );
 
     private final int pageSize;
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/97719684/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCache.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCache.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCache.java
index 173b89d..23c2c25 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCache.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCache.java
@@ -38,7 +38,7 @@ public interface NodeShardCache {
      * @param timestamp The time to select the slice for.
      * @param directedEdgeMeta The directed edge meta data
      */
-    public ShardEntryGroup getWriteShardGroup( final ApplicationScope scope,
+    ShardEntryGroup getWriteShardGroup( final ApplicationScope scope,
                                                final long timestamp, final DirectedEdgeMeta directedEdgeMeta );
 
     /**
@@ -49,6 +49,9 @@ public interface NodeShardCache {
      * @param directedEdgeMeta The directed edge meta data
      * @return
      */
-    public Iterator<ShardEntryGroup> getReadShardGroup( final ApplicationScope scope, final long maxTimestamp, final DirectedEdgeMeta directedEdgeMeta  );
+    Iterator<ShardEntryGroup> getReadShardGroup( final ApplicationScope scope, final long maxTimestamp, final DirectedEdgeMeta directedEdgeMeta  );
+
+
+    void invalidate();
 
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/97719684/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
index 5eeeae0..76a0922 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
@@ -39,6 +39,7 @@ import org.apache.usergrid.persistence.core.util.ValidationUtils;
 import org.apache.usergrid.persistence.graph.GraphFig;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardSerialization;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardCache;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.serialize.EdgeShardRowKeySerializer;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.serialize.ShardSerializer;

http://git-wip-us.apache.org/repos/asf/usergrid/blob/97719684/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
index 6b190a1..a6cf378 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
@@ -23,6 +23,7 @@ package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
 import java.util.Collections;
 import java.util.Iterator;
 
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,14 +34,6 @@ import org.apache.usergrid.persistence.graph.GraphFig;
 import org.apache.usergrid.persistence.graph.MarkedEdge;
 import org.apache.usergrid.persistence.graph.SearchByEdgeType;
 import org.apache.usergrid.persistence.graph.exception.GraphRuntimeException;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeColumnFamilies;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardSerialization;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardAllocation;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardGroupCompaction;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardedEdgeSerialization;
 import org.apache.usergrid.persistence.graph.serialization.util.GraphValidation;
 
 import com.google.common.base.Optional;
@@ -65,19 +58,22 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
     private final TimeService timeService;
     private final GraphFig graphFig;
     private final ShardGroupCompaction shardGroupCompaction;
+    private final NodeShardCache nodeShardCache;
 
 
     @Inject
     public NodeShardAllocationImpl( final EdgeShardSerialization edgeShardSerialization,
                                     final EdgeColumnFamilies edgeColumnFamilies,
                                     final ShardedEdgeSerialization shardedEdgeSerialization, final TimeService timeService,
-                                    final GraphFig graphFig, final ShardGroupCompaction shardGroupCompaction ) {
+                                    final GraphFig graphFig, final ShardGroupCompaction shardGroupCompaction,
+                                    final NodeShardCache nodeShardCache) {
         this.edgeShardSerialization = edgeShardSerialization;
         this.edgeColumnFamilies = edgeColumnFamilies;
         this.shardedEdgeSerialization = shardedEdgeSerialization;
         this.timeService = timeService;
         this.graphFig = graphFig;
         this.shardGroupCompaction = shardGroupCompaction;
+        this.nodeShardCache = nodeShardCache;
     }
 
 
@@ -101,7 +97,7 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
             //logger.info("existing shards has something: {}", existingShards.hasNext());
 
             /**
-             * We didn't get anything out of cassandra, so we need to create the minumum shard
+             * We didn't get anything out of cassandra, so we need to create the minimum shard
              */
             if ( existingShards == null || !existingShards.hasNext() ) {
 
@@ -250,6 +246,13 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
 
         try {
             batch.execute();
+
+            if(logger.isTraceEnabled()) {
+                logger.trace("Clearing shard cache");
+            }
+
+            // invalidate the shard cache so we can be sure that all read shards are up to date
+            nodeShardCache.invalidate();
         }
         catch ( ConnectionException e ) {
             throw new RuntimeException( "Unable to connect to casandra", e );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/97719684/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
index 1a88ebb..5eaaaa0 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
@@ -34,6 +34,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
+import com.google.inject.Singleton;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -68,6 +69,7 @@ import com.google.inject.Inject;
  * Simple implementation of the shard.  Uses a local Guava shard with a timeout.  If a value is not present in the
  * shard, it will need to be searched via cassandra.
  */
+@Singleton
 public class NodeShardCacheImpl implements NodeShardCache {
 
     private static final Logger logger = LoggerFactory.getLogger( NodeShardCacheImpl.class );
@@ -171,6 +173,9 @@ public class NodeShardCacheImpl implements NodeShardCache {
             throw new GraphRuntimeException( "Unable to load shard key for graph", e );
         }
 
+        // do this if wanting to bypass the cache for getting the read shards
+        //entry = new CacheEntry(nodeShardAllocation.getShards( key.scope, Optional.<Shard>absent(), key.directedEdgeMeta ));
+
         Iterator<ShardEntryGroup> iterator = entry.getShards( maxTimestamp );
 
         if ( iterator == null ) {
@@ -180,6 +185,12 @@ public class NodeShardCacheImpl implements NodeShardCache {
         return iterator;
     }
 
+    @Override
+    public void invalidate(){
+
+        graphs.invalidateAll();
+
+    }
 
     /**
      * This is a race condition.  We could re-init the shard while another thread is reading it.  This is fine, the read

http://git-wip-us.apache.org/repos/asf/usergrid/blob/97719684/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
index b88c52c..7854c3b 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
@@ -27,15 +27,11 @@ import java.util.*;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import javax.annotation.Nullable;
 
 import com.google.common.base.Optional;
-import com.netflix.astyanax.connectionpool.OperationResult;
-import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
-import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -62,8 +58,6 @@ import com.google.inject.Singleton;
 import com.netflix.astyanax.Keyspace;
 import com.netflix.astyanax.MutationBatch;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-import rx.Observable;
-import rx.schedulers.Schedulers;
 
 
 /**
@@ -94,6 +88,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
     private final Random random;
     private final ShardCompactionTaskTracker shardCompactionTaskTracker;
     private final ShardAuditTaskTracker shardAuditTaskTracker;
+    private final NodeShardCache nodeShardCache;
 
 
     @Inject
@@ -102,7 +97,8 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
                                      final ShardedEdgeSerialization shardedEdgeSerialization,
                                      final EdgeColumnFamilies edgeColumnFamilies, final Keyspace keyspace,
                                      final EdgeShardSerialization edgeShardSerialization,
-                                     final AsyncTaskExecutor asyncTaskExecutor) {
+                                     final AsyncTaskExecutor asyncTaskExecutor,
+                                     final NodeShardCache nodeShardCache ) {
 
         this.timeService = timeService;
         this.countAudits = new AtomicLong();
@@ -119,6 +115,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
 
 
         this.taskExecutor = asyncTaskExecutor.getExecutorService();
+        this.nodeShardCache = nodeShardCache;
     }
 
 
@@ -319,7 +316,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
         if ( totalEdgeCount == 0 ) {
 
 
-            //now that we've marked our target as compacted, we can successfully remove any shards that are not
+            // now that we've marked our target as compacted, we can successfully remove any shards that are not
             // compacted themselves in the sources
 
             final MutationBatch shardRemovalRollup = keyspace.prepareMutationBatch();
@@ -342,6 +339,9 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
 
             try {
                 shardRemovalRollup.execute();
+
+                // invalidate the shard cache so we can be sure that all read shards are up to date
+                nodeShardCache.invalidate();
             }
             catch ( ConnectionException e ) {
                 throw new RuntimeException( "Unable to connect to cassandra", e );
@@ -357,6 +357,9 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
             final MutationBatch updateMark = edgeShardSerialization.writeShardMeta( scope, compactedShard, edgeMeta );
             try {
                 updateMark.execute();
+
+                // invalidate the shard cache so we can be sure that all read shards are up to date
+                nodeShardCache.invalidate();
             }
             catch ( ConnectionException e ) {
                 throw new RuntimeException( "Unable to connect to cassandra", e );
@@ -598,15 +601,11 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
             addToHash( hasher, scope.getApplication() );
 
 
-            /** Commenting the full meta from the hash so we allocate/compact shards in a more controlled fashion
-
             for ( DirectedEdgeMeta.NodeMeta nodeMeta : directedEdgeMeta.getNodes() ) {
                 addToHash( hasher, nodeMeta.getId() );
                 hasher.putInt( nodeMeta.getNodeType().getStorageValue() );
             }
 
-            **/
-
 
             /**
              * Add our edge type

http://git-wip-us.apache.org/repos/asf/usergrid/blob/97719684/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
index 1ce23b9..652c8d6 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
@@ -105,6 +105,8 @@ public class GraphManagerShardConsistencyIT {
 
     protected int TARGET_NUM_SHARDS = 5;
 
+    protected int POST_WRITE_SLEEP = 2000;
+
 
 
     @Before
@@ -175,7 +177,7 @@ public class GraphManagerShardConsistencyIT {
     public void writeThousandsSingleSource()
         throws InterruptedException, ExecutionException, MigrationException, UnsupportedEncodingException {
 
-        final Id sourceId = IdGenerator.createId( "sourceWrite_"+ UUIDGenerator.newTimeUUID().toString() );
+        final Id sourceId = IdGenerator.createId( "sourceWrite" );
         final String edgeType = "testWrite";
 
         final EdgeGenerator generator = new EdgeGenerator() {
@@ -183,7 +185,7 @@ public class GraphManagerShardConsistencyIT {
 
             @Override
             public Edge newEdge() {
-                Edge edge = createEdge( sourceId, edgeType, IdGenerator.createId( "targetWrite_"+ UUIDGenerator.newTimeUUID().toString() ) );
+                Edge edge = createEdge( sourceId, edgeType, IdGenerator.createId( "targetWrite" ) );
 
 
                 return edge;
@@ -199,7 +201,7 @@ public class GraphManagerShardConsistencyIT {
         };
 
 
-        final int numInjectors = 1;
+        final int numInjectors = 2;
 
         /**
          * create injectors.  This way all the caches are independent of one another.  This is the same as
@@ -277,9 +279,11 @@ public class GraphManagerShardConsistencyIT {
 
 
         final List<Throwable> failures = new ArrayList<>();
-        Thread.sleep(3000); // let's make sure everything is written
 
-        for(int i = 0; i < 2; i ++) {
+        logger.info("Sleeping {}ms before reading to ensure all compactions have completed", POST_WRITE_SLEEP);
+        Thread.sleep(POST_WRITE_SLEEP); // let's make sure everything is written
+
+        for(int i = 0; i < 1; i ++) {
 
 
             /**
@@ -303,6 +307,16 @@ public class GraphManagerShardConsistencyIT {
                 public void onFailure( final Throwable t ) {
                     failures.add( t );
                     logger.error( "Failed test!", t );
+
+                    final Iterator<ShardEntryGroup> groups = cache.getReadShardGroup( scope, Long.MAX_VALUE, directedEdgeMeta );
+
+                    while ( groups.hasNext() ) {
+
+                        logger.info( "Shard entry group: {}", groups.next() );
+
+                    }
+
+
                 }
             } );
         }
@@ -409,7 +423,7 @@ public class GraphManagerShardConsistencyIT {
         throws InterruptedException, ExecutionException, MigrationException, UnsupportedEncodingException {
 
         final Id sourceId = IdGenerator.createId( "sourceDelete" );
-        final String deleteEdgeType = "testDelete_"+ UUIDGenerator.newTimeUUID().toString();
+        final String deleteEdgeType = "testDelete";
 
         final EdgeGenerator generator = new EdgeGenerator() {
 
@@ -432,7 +446,7 @@ public class GraphManagerShardConsistencyIT {
         };
 
 
-        final int numInjectors = 2;
+        final int numInjectors = 3;
 
         /**
          * create injectors.  This way all the caches are independent of one another.  This is the same as

http://git-wip-us.apache.org/repos/asf/usergrid/blob/97719684/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
index 6671dec..00406c0 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
@@ -101,10 +101,12 @@ public class NodeShardAllocationTest {
 
         final TimeService timeService = mock( TimeService.class );
 
+        final NodeShardCache nodeShardCache = mock( NodeShardCache.class);
+
 
         NodeShardAllocation approximation =
                 new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
-                         timeService, graphFig, shardGroupCompaction );
+                         timeService, graphFig, shardGroupCompaction, nodeShardCache );
 
 
         final long timeservicetime = System.currentTimeMillis();
@@ -131,10 +133,13 @@ public class NodeShardAllocationTest {
 
         final TimeService timeService = mock( TimeService.class );
 
+        final NodeShardCache nodeShardCache = mock( NodeShardCache.class);
+
+
 
         NodeShardAllocation approximation =
                 new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
-                      timeService, graphFig, shardGroupCompaction );
+                      timeService, graphFig, shardGroupCompaction, nodeShardCache);
 
         final Id nodeId = IdGenerator.createId( "test" );
         final String type = "type";
@@ -172,9 +177,12 @@ public class NodeShardAllocationTest {
 
         final TimeService timeService = mock( TimeService.class );
 
+        final NodeShardCache nodeShardCache = mock( NodeShardCache.class);
+
+
         NodeShardAllocation approximation =
                 new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
-                        timeService, graphFig, shardGroupCompaction );
+                        timeService, graphFig, shardGroupCompaction, nodeShardCache );
 
         final Id nodeId = IdGenerator.createId( "test" );
         final String type = "type";
@@ -216,9 +224,12 @@ public class NodeShardAllocationTest {
 
         final TimeService timeService = mock( TimeService.class );
 
+        final NodeShardCache nodeShardCache = mock( NodeShardCache.class);
+
+
         NodeShardAllocation approximation =
                 new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
-                        timeService, graphFig, shardGroupCompaction );
+                        timeService, graphFig, shardGroupCompaction, nodeShardCache );
 
         final Id nodeId = IdGenerator.createId( "test" );
         final String type = "type";
@@ -326,9 +337,12 @@ public class NodeShardAllocationTest {
 
         final TimeService timeService = mock( TimeService.class );
 
+        final NodeShardCache nodeShardCache = mock( NodeShardCache.class);
+
+
         NodeShardAllocation approximation =
                 new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
-                         timeService, graphFig, shardGroupCompaction );
+                         timeService, graphFig, shardGroupCompaction, nodeShardCache );
 
         final Id nodeId = IdGenerator.createId( "test" );
         final String type = "type";
@@ -412,9 +426,12 @@ public class NodeShardAllocationTest {
 
         final TimeService timeService = mock( TimeService.class );
 
+        final NodeShardCache nodeShardCache = mock( NodeShardCache.class);
+
+
         NodeShardAllocation approximation =
                 new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
-                       timeService, graphFig, shardGroupCompaction );
+                       timeService, graphFig, shardGroupCompaction, nodeShardCache );
 
         final Id nodeId = IdGenerator.createId( "test" );
         final String type = "type";
@@ -475,10 +492,13 @@ public class NodeShardAllocationTest {
 
         final TimeService timeService = mock( TimeService.class );
 
+        final NodeShardCache nodeShardCache = mock( NodeShardCache.class);
+
+
 
         NodeShardAllocation approximation =
                 new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
-                         timeService, graphFig, shardGroupCompaction );
+                         timeService, graphFig, shardGroupCompaction, nodeShardCache );
 
         final Id nodeId = IdGenerator.createId( "test" );
         final String type = "type";
@@ -613,9 +633,12 @@ public class NodeShardAllocationTest {
 
         final MutationBatch batch = mock( MutationBatch.class );
 
+        final NodeShardCache nodeShardCache = mock( NodeShardCache.class);
+
+
         NodeShardAllocation approximation =
                 new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
-                       timeService, graphFig, shardGroupCompaction );
+                       timeService, graphFig, shardGroupCompaction, nodeShardCache );
 
         final Id nodeId = IdGenerator.createId( "test" );
         final String type = "type";
@@ -704,9 +727,12 @@ public class NodeShardAllocationTest {
 
         when( graphFig.getShardMinDelta() ).thenReturn( tooSmallDelta );
 
+        final NodeShardCache nodeShardCache = mock( NodeShardCache.class);
+
+
         NodeShardAllocation approximation =
                 new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
-                      timeService, graphFig, shardGroupCompaction );
+                      timeService, graphFig, shardGroupCompaction, nodeShardCache );
 
 
         /**

http://git-wip-us.apache.org/repos/asf/usergrid/blob/97719684/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java
index 65f19ff..666e30a 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java
@@ -99,6 +99,8 @@ public class ShardGroupCompactionTest {
 
         final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
 
+        final NodeShardCache nodeShardCache = mock( NodeShardCache.class );
+
 
         final long delta = 10000;
 
@@ -116,7 +118,7 @@ public class ShardGroupCompactionTest {
 
         ShardGroupCompactionImpl compaction =
             new ShardGroupCompactionImpl( timeService, graphFig, nodeShardAllocation, shardedEdgeSerialization,
-                edgeColumnFamilies, keyspace, edgeShardSerialization, asyncTaskExecutor );
+                edgeColumnFamilies, keyspace, edgeShardSerialization, asyncTaskExecutor, nodeShardCache );
 
         DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromSourceNode( IdGenerator.createId( "source" ), "test" );
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/97719684/stack/corepersistence/graph/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/resources/log4j.properties b/stack/corepersistence/graph/src/test/resources/log4j.properties
index e7f7524..5c6b045 100644
--- a/stack/corepersistence/graph/src/test/resources/log4j.properties
+++ b/stack/corepersistence/graph/src/test/resources/log4j.properties
@@ -37,10 +37,10 @@ log4j.logger.cassandra.db=ERROR
 #log4j.logger.org.apache.usergrid.persistence.core.rx=TRACE
 #log4j.logger.org.apache.usergrid.persistence.core.astyanax=TRACE
 #log4j.logger.org.apache.usergrid.persistence.graph.serialization.impl.parse=TRACE
-#log4j.logger.org.apache.usergrid.persistence.core.astyanax.MultiRowColumnIterator=TRACE
+#log4j.logger.org.apache.usergrid.persistence.core.astyanax.MultiRowShardColumnIterator=TRACE
 #log4j.logger.org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardsColumnIterator=TRACE
 #log4j.logger.org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardGroupColumnIterator=TRACE
-#log4j.logger.org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup=TRACE
+#log4j.logger.org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardEntryGroupIterator=TRACE
 #log4j.logger.org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup=TRACE
 #log4j.logger.org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.NodeShardAllocationImpl=TRACE