You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/04/12 00:21:33 UTC

[7/7] git commit: Added tests for shard allocation and caching.

Added tests for shard allocation and caching.

Time UUID ordering when UUID is generated appears to be broken.  Need to fix this.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/f1c61867
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/f1c61867
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/f1c61867

Branch: refs/heads/asyncqueue
Commit: f1c61867aa7ff73b8cbe17072b5e5ae47831d2f1
Parents: 21d4b64
Author: Todd Nine <tn...@apigee.com>
Authored: Fri Apr 11 14:45:35 2014 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri Apr 11 14:45:35 2014 -0700

----------------------------------------------------------------------
 .../EntityCollectionManagerFactory.java         |   4 +-
 .../MultiTennantColumnFamilyDefinition.java     |   2 +-
 .../MvccLogEntrySerializationStrategyImpl.java  |   2 +-
 stack/corepersistence/graph/pom.xml             |   6 +
 .../usergrid/persistence/graph/GraphFig.java    |  26 +-
 .../graph/impl/shard/NodeShardAllocation.java   |  11 -
 .../impl/shard/NodeShardAllocationImpl.java     | 138 ++++---
 .../impl/shard/NodeShardApproximation.java      |  22 +-
 .../impl/shard/NodeShardApproximationImpl.java  |  75 +---
 .../graph/impl/shard/NodeShardCacheImpl.java    |  19 +-
 .../EdgeSeriesCounterSerialization.java         |  34 +-
 .../serialization/EdgeSeriesSerialization.java  |   2 +-
 .../graph/impl/shard/HyperLogLogTest.java       |   8 +-
 .../impl/shard/NodeShardAllocationTest.java     | 411 +++++++++++++++++++
 .../impl/shard/NodeShardApproximationTest.java  | 141 +++++++
 .../graph/impl/shard/NodeShardCacheTest.java    |   4 +-
 stack/corepersistence/model/pom.xml             |   8 +
 .../persistence/model/util/UUIDGenerator.java   | 217 +++++++---
 .../persistence/model/util/UUIDUtils.java       |  38 ++
 .../model/util/UUIDGeneratorTest.java           |  17 +
 20 files changed, 946 insertions(+), 239 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f1c61867/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerFactory.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerFactory.java
index a2f2ff7..eb5fb14 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerFactory.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerFactory.java
@@ -28,7 +28,7 @@ public interface EntityCollectionManagerFactory {
     /**
      * Create a new EntityCollectionManager for the given context. 
      * The EntityCollectionManager can safely be used on the current thread 
-     * and will cache responses.  The returned instance should not be shared 
+     * and will shard responses.  The returned instance should not be shared
      * among threads it will not be guaranteed to be thread safe.
      *
      * @param collectionScope The collectionScope collectionScope to use 
@@ -44,7 +44,7 @@ public interface EntityCollectionManagerFactory {
     /**
      * Create a new EntityCollectionManagerSync for the given context. 
      * The EntityCollectionManager can safely be used on the current thread 
-     * and will cache responses.  The returned instance should not be shared 
+     * and will shard responses.  The returned instance should not be shared
      * among threads it will not be guaranteed to be thread safe.  
      * This implementation will be synchronous. Try to use the consistency
      * implementation if possible

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f1c61867/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/astyanax/MultiTennantColumnFamilyDefinition.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/astyanax/MultiTennantColumnFamilyDefinition.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/astyanax/MultiTennantColumnFamilyDefinition.java
index ddbafc2..55c070e 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/astyanax/MultiTennantColumnFamilyDefinition.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/astyanax/MultiTennantColumnFamilyDefinition.java
@@ -39,7 +39,7 @@ public class MultiTennantColumnFamilyDefinition {
      */
     public enum CacheOption {
         /**
-         * Use both row key cache and key cache
+         * Use both row key shard and key shard
          */
         ALL( "ALL" ),
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f1c61867/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java
index ec19298..90b5d57 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java
@@ -237,7 +237,7 @@ public class MvccLogEntrySerializationStrategyImpl implements MvccLogEntrySerial
 
 
     /**
-     * Internal stage cache
+     * Internal stage shard
      */
     private static class StageCache {
         private Map<Integer, Stage> values = new HashMap<Integer, Stage>( Stage.values().length );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f1c61867/stack/corepersistence/graph/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/pom.xml b/stack/corepersistence/graph/pom.xml
index 64587a9..47c017d 100644
--- a/stack/corepersistence/graph/pom.xml
+++ b/stack/corepersistence/graph/pom.xml
@@ -39,6 +39,12 @@
       <version>${project.version}</version>
     </dependency>
 
+    <dependency>
+        <groupId>org.apache.commons</groupId>
+        <artifactId>commons-collections4</artifactId>
+        <version>4.0</version>
+    </dependency>
+
 
     <!-- utilized for hyperloglog cardinality terms -->
     <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f1c61867/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
index c4fae69..0032190 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
@@ -49,15 +49,15 @@ public interface GraphFig extends GuicyFig {
 
     public static final String READ_TIMEOUT = "usergrid.graph.read.timeout";
 
-    public static final String CACHE_TIMEOUT = "usergrid.graph.shard.timeout";
+    public static final String SHARD_SIZE = "usergrid.graph.shard.size";
 
-    public static final String CACHE_SIZE = "usergrid.graph.shard.size";
+    public static final String SHARD_CACHE_SIZE = "usergrid.graph.shard.cache.size";
 
     public static final String SHARD_CACHE_TIMEOUT = "usergrid.graph.shard.cache.timeout";
 
     public static final String COUNTER_PRECISION_LOSS = "usergrid.graph.shard.counter.precision.loss";
 
-    @Default( "1000" )
+    @Default("1000")
     @Key(SCAN_PAGE_SIZE)
     int getScanPageSize();
 
@@ -69,8 +69,8 @@ public interface GraphFig extends GuicyFig {
     @Key(WRITE_CL)
     String getWriteCL();
 
-    @Default( "10000" )
-    @Key( WRITE_TIMEOUT )
+    @Default("10000")
+    @Key(WRITE_TIMEOUT)
     int getWriteTimeout();
 
     /**
@@ -97,23 +97,21 @@ public interface GraphFig extends GuicyFig {
     int getRepairTimeout();
 
 
-    @Default( "10000" )
-    @Key( CACHE_SIZE )
-    int getShardCacheSize();
-
-    @Default( "30000" )
-    @Key( CACHE_TIMEOUT )
-    long getCacheTimeout();
+    @Default("10000")
+    @Key(SHARD_SIZE)
+    long getShardSize();
 
     @Default(".02")
     @Key(COUNTER_PRECISION_LOSS)
     double getShardCounterPrecisionLoss();
 
 
-    @Default( "10000" )
+    @Default("30000")
     @Key(SHARD_CACHE_TIMEOUT)
     long getShardCacheTimeout();
 
-
+    @Default( "250000" )
+    @Key( SHARD_CACHE_SIZE )
+    long getShardCacheSize();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f1c61867/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardAllocation.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardAllocation.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardAllocation.java
index bede2ab..05f6818 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardAllocation.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardAllocation.java
@@ -59,15 +59,4 @@ public interface NodeShardAllocation {
     public boolean auditMaxShard(final OrganizationScope scope, final Id nodeId, final String... edgeType);
 
 
-    /**
-     * Increment the shard Id the specified amount
-     * @param scope The scope
-     * @param nodeId The node id
-     * @param shardId The shard id
-     * @param count The count
-     * @param edgeType The edge type
-     */
-    public void increment(final OrganizationScope scope, final Id nodeId, final UUID shardId, int count, final String... edgeType);
-
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f1c61867/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardAllocationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardAllocationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardAllocationImpl.java
index 4dc4b68..e6e8c66 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardAllocationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardAllocationImpl.java
@@ -20,9 +20,13 @@
 package org.apache.usergrid.persistence.graph.impl.shard;
 
 
+import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
 import java.util.UUID;
 
+import org.apache.commons.collections4.iterators.PushbackIterator;
+
 import org.apache.usergrid.persistence.collection.OrganizationScope;
 import org.apache.usergrid.persistence.graph.GraphFig;
 import org.apache.usergrid.persistence.graph.consistency.TimeService;
@@ -33,6 +37,7 @@ import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 
 import com.fasterxml.uuid.UUIDComparator;
 import com.google.inject.Inject;
+import com.netflix.astyanax.Keyspace;
 import com.netflix.astyanax.MutationBatch;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
@@ -49,101 +54,138 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
     private final EdgeSeriesCounterSerialization edgeSeriesCounterSerialization;
     private final TimeService timeService;
     private final GraphFig graphFig;
+    private final Keyspace keyspace;
 
 
     @Inject
     public NodeShardAllocationImpl( final EdgeSeriesSerialization edgeSeriesSerialization,
                                     final EdgeSeriesCounterSerialization edgeSeriesCounterSerialization,
-                                    final TimeService timeService, final GraphFig graphFig ) {
+                                    final TimeService timeService, final GraphFig graphFig, final Keyspace keyspace ) {
         this.edgeSeriesSerialization = edgeSeriesSerialization;
         this.edgeSeriesCounterSerialization = edgeSeriesCounterSerialization;
         this.timeService = timeService;
         this.graphFig = graphFig;
+        this.keyspace = keyspace;
     }
 
 
     @Override
     public Iterator<UUID> getShards( final OrganizationScope scope, final Id nodeId, final UUID maxShardId,
                                      final int pageSize, final String... edgeTypes ) {
-        return edgeSeriesSerialization.getEdgeMetaData( scope, nodeId, maxShardId, pageSize, edgeTypes );
-    }
 
+        final Iterator<UUID> existingShards =
+                edgeSeriesSerialization.getEdgeMetaData( scope, nodeId, maxShardId, pageSize, edgeTypes );
 
-    @Override
-    public boolean auditMaxShard( final OrganizationScope scope, final Id nodeId, final String... edgeType ) {
+        final PushbackIterator<UUID> pushbackIterator = new PushbackIterator( existingShards );
 
-        final UUID now = UUIDGenerator.newTimeUUID();
 
-        Iterator<UUID> maxShards = getShards( scope, nodeId, MAX_UUID, 1, edgeType );
+        final UUID now = UUIDGenerator.newTimeUUID(timeService.getCurrentTime());
 
-        //if the first shard has already been allocated, do nothing.
 
-        //now is already > than the max, don't do anything
-        if ( maxShards.hasNext() && UUIDComparator.staticCompare( now, maxShards.next() ) < 0 ) {
-            return false;
+        final List<UUID> futures = new ArrayList<UUID>();
+
+
+        //loop through all shards, any shard > now+1 should be deleted
+        while ( pushbackIterator.hasNext() ) {
+
+            final UUID value = pushbackIterator.next();
+
+            //we're done, our current time uuid is greater than the value stored
+            if ( UUIDComparator.staticCompare( now, value ) > 0 ) {
+                //push it back into the iterator
+                pushbackIterator.pushback( value );
+                break;
+            }
+
+            futures.add( value );
         }
 
-        final long newShardTime = timeService.getCurrentTime() + graphFig.getCacheTimeout()*2;
 
-        //allocate a new shard at least now+ 2x our shard timeout.  We want to be sure that all replicas pick up on the new
-        //shard
+        //we have more than 1 future value, we need to remove it
+
+        MutationBatch cleanup = keyspace.prepareMutationBatch();
+
+        //remove all futures except the last one, it is the only value we shouldn't lazy remove
+        for ( int i = futures.size() - 2; i > 0; i-- ) {
+            final UUID toRemove = futures.get( i );
+
+            final MutationBatch batch = edgeSeriesSerialization.removeEdgeMeta( scope, nodeId, toRemove, edgeTypes );
+
+            cleanup.mergeShallow( batch );
+        }
 
-        final UUID futureUUID = UUIDGenerator.newTimeUUID(newShardTime);
 
         try {
-            this.edgeSeriesSerialization.writeEdgeMeta( scope, nodeId, futureUUID, edgeType ).execute();
+            cleanup.execute();
         }
         catch ( ConnectionException e ) {
-            throw new RuntimeException( "Unable to write the new edge metadata" );
+            throw new RuntimeException( "Unable to remove future shards, mutation error", e );
         }
 
-        UUID max = null;
 
+        final int futuresSize =  futures.size();
 
-        MutationBatch rollup = null;
+        if ( futuresSize > 0 ) {
+            pushbackIterator.pushback( futures.get( futuresSize - 1 ) );
+        }
 
-        Iterator<UUID> shards = getShards( scope, nodeId, MAX_UUID, 1000, edgeType );
 
-        while ( shards.hasNext() ) {
+        return pushbackIterator;
+    }
 
-            final UUID shardId = shards.next();
 
-            if ( UUIDComparator.staticCompare( shardId, max ) >= 0 ) {
-                break;
-            }
+    @Override
+    public boolean auditMaxShard( final OrganizationScope scope, final Id nodeId, final String... edgeType ) {
 
+        final UUID now = UUIDGenerator.newTimeUUID( timeService.getCurrentTime() );
 
-            //remove the edge that is too large from the node shard allocation
-            final MutationBatch batch = edgeSeriesSerialization.removeEdgeMeta( scope, nodeId, shardId, edgeType );
+        final Iterator<UUID> maxShards = getShards( scope, nodeId, MAX_UUID, 1, edgeType );
 
-            if ( rollup == null ) {
-                rollup = batch;
-            }
-            else {
-                rollup.mergeShallow( batch );
-            }
 
+        //if the first shard has already been allocated, do nothing.
 
-            //while our max value is > than the value we just created, delete it
+        //now is already > than the max, don't do anything
+        if ( !maxShards.hasNext() ) {
+            return false;
         }
 
-        if ( rollup != null ) {
-            try {
-                rollup.execute();
-            }
-            catch ( ConnectionException e ) {
-                throw new RuntimeException( "Unable to cleanup allocated shards" );
-            }
+        final UUID maxShard = maxShards.next();
+
+        /**
+         * Nothing to do, it's already in the future
+         */
+        if ( UUIDComparator.staticCompare( maxShard, now ) > 0 ) {
+            return false;
         }
 
-        return true;
-    }
 
+        /**
+         * Check out if we have a count for our shard allocation
+         */
+        final long count = edgeSeriesCounterSerialization.getCount( scope, nodeId, maxShard, edgeType );
 
-    @Override
-    public void increment( final OrganizationScope scope, final Id nodeId, final UUID shardId, final int count,
-                           final String... edgeType ) {
-        //delegate
-//        edgeSeriesCounterSerialization.incrementMetadataCount( scope, nodeId, shardId, count, edgeType );
+        if ( count < graphFig.getShardSize() ) {
+            return false;
+        }
+
+        //try to get a lock here, and fail if one isn't present
+
+        final long newShardTime = timeService.getCurrentTime() + graphFig.getShardCacheTimeout() * 2;
+
+        //allocate a new shard at least now+ 2x our shard timeout.  We want to be sure that all replicas pick up on
+        // the new
+        //shard
+
+        final UUID futureUUID = UUIDGenerator.newTimeUUID( newShardTime );
+
+        try {
+            this.edgeSeriesSerialization.writeEdgeMeta( scope, nodeId, futureUUID, edgeType ).execute();
+        }
+        catch ( ConnectionException e ) {
+            throw new RuntimeException( "Unable to write the new edge metadata" );
+        }
+
+
+        return true;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f1c61867/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardApproximation.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardApproximation.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardApproximation.java
index 8eda5c7..f86b95e 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardApproximation.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardApproximation.java
@@ -33,19 +33,21 @@ public interface NodeShardApproximation {
 
 
     /**
-       * Increment the shard Id the specified amount
-       * @param scope The scope
-       * @param nodeId The node id
-       * @param shardId The shard id
-       * @param edgeType The edge type
-       */
-      public void increment(final OrganizationScope scope, final Id nodeId, final UUID shardId, final String... edgeType);
+     * Increment the shard Id the specified amount
+     *
+     * @param scope The scope
+     * @param nodeId The node id
+     * @param shardId The shard id
+     * @param count
+     * @param edgeType The edge type
+     */
+    public void increment( final OrganizationScope scope, final Id nodeId, final UUID shardId,  final long count,
+                           final String... edgeType );
 
 
     /**
      * Get the approximation of the number of unique items
-     * @return
      */
-    public long getCount(final OrganizationScope scope, final Id nodeId, final UUID shardId, final String... edgeType);
-
+    public long getCount( final OrganizationScope scope, final Id nodeId, final UUID shardId,
+                          final String... edgeType );
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f1c61867/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardApproximationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardApproximationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardApproximationImpl.java
index 5a3db86..d8b0b63 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardApproximationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardApproximationImpl.java
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.UUID;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicLong;
 
 import javax.inject.Inject;
 
@@ -60,36 +61,26 @@ import com.netflix.astyanax.serializers.UUIDSerializer;
 
 public class NodeShardApproximationImpl implements NodeShardApproximation {
 
-    //TODO T.N. refactor into an expiring local cache.  We need each hyperlog to be it's own instance of a given shard
-    //if this is to work\
-    private static final UUIDSerializer UUID_SERIALIZER = UUIDSerializer.get();
-
-    /**
-     * We generate a new time uuid every time a new instance is started.  We should never re-use an instance.
-     */
-    private final UUID identity = UUIDGenerator.newTimeUUID();
-
     private final GraphFig graphFig;
 
-    private final LoadingCache<ShardKey, HyperLogLog> graphLogs;
-
-    private final EdgeSeriesCounterSerialization edgeSeriesCounterSerialization;
+    //TODO, replace with with our counters. This is just a POC for now.  HyperLogLog appears to use too much ram, waiting
+    //to hear back on http://dsiutils.di.unimi.it/#install
+    //for our use case
+    private final LoadingCache<ShardKey, AtomicLong> graphLogs;
 
 
     /**
      * Create a time shard approximation with the correct configuration.
      */
     @Inject
-    public NodeShardApproximationImpl( final GraphFig graphFig,
-                                       final EdgeSeriesCounterSerialization edgeSeriesCounterSerialization ) {
+    public NodeShardApproximationImpl( final GraphFig graphFig) {
         this.graphFig = graphFig;
-        this.edgeSeriesCounterSerialization = edgeSeriesCounterSerialization;
 
         graphLogs = CacheBuilder.newBuilder()
-               .maximumSize( graphFig.getShardCacheTimeout() )
-               .build( new CacheLoader<ShardKey, HyperLogLog>() {
-                   public HyperLogLog load( ShardKey key ) {
-                       return loadCache( key );
+               .maximumSize( graphFig.getShardCacheSize() )
+               .build( new CacheLoader<ShardKey, AtomicLong>() {
+                   public AtomicLong load( ShardKey key ) {
+                       return new AtomicLong(  );
                    }
                } );
 
@@ -97,23 +88,14 @@ public class NodeShardApproximationImpl implements NodeShardApproximation {
 
 
     @Override
-    public void increment( final OrganizationScope scope, final Id nodeId, final UUID shardId,
+    public void increment( final OrganizationScope scope, final Id nodeId, final UUID shardId,  final long count,
                            final String... edgeType ) {
 
 
-
-        final ByteBuffer buff = UUID_SERIALIZER.toByteBuffer( shardId );
-
-        byte[] bytes = buff.array();
-
-
-
-        long longHash = MurmurHash.hash64(bytes, bytes.length );
-
         final ShardKey key = new ShardKey( scope, nodeId, shardId, edgeType );
 
         try {
-            graphLogs.get( key).offerHashed( longHash );
+            graphLogs.get( key).addAndGet(count);
         }
         catch ( ExecutionException e ) {
             throw new RuntimeException( "Unable to get hyperloglog from cache", e );
@@ -123,42 +105,23 @@ public class NodeShardApproximationImpl implements NodeShardApproximation {
     }
 
 
-    private HyperLogLog loadCache(ShardKey key){
-//         edgeSeriesCounterSerialization
-        return null;
-    }
-
-
     @Override
     public long getCount( final OrganizationScope scope, final Id nodeId, final UUID shardId,
                           final String... edgeType ) {
-        return 0;  //To change body of implemented methods use File | Settings | File Templates.
-    }
-
-
-    private byte[] hash( final OrganizationScope scope, final Id nodeId, final UUID shardId,
-                         final String... edgeTypes ) {
-        StringBuilder builder = new StringBuilder();
-
-        final Id organization = scope.getOrganization();
-
-        builder.append( organization.getUuid() );
-        builder.append( organization.getType() );
 
-        builder.append( nodeId.getUuid() );
-        builder.append( nodeId.getType() );
+        final ShardKey key = new ShardKey( scope, nodeId, shardId, edgeType );
 
-        builder.append( shardId.toString() );
 
-        for ( String edgeType : edgeTypes ) {
-            builder.append( edgeType );
+        try {
+            return graphLogs.get( key ).get();
+        }
+        catch ( ExecutionException e ) {
+            throw new RuntimeException("Unable to execute cache get", e);
         }
-
-        return null;
-//        return builder.toString().getBytes( CHARSET );
     }
 
 
+
     /**
      * Internal class for shard keys
      */

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f1c61867/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardCacheImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardCacheImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardCacheImpl.java
index 714791f..fd03fc4 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardCacheImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardCacheImpl.java
@@ -77,7 +77,7 @@ public class NodeShardCacheImpl implements NodeShardCache {
             public void propertyChange( final PropertyChangeEvent evt ) {
                 final String propertyName = evt.getPropertyName();
 
-                if ( propertyName.equals( GraphFig.CACHE_SIZE ) || propertyName.equals( GraphFig.CACHE_TIMEOUT ) ) {
+                if ( propertyName.equals( GraphFig.SHARD_CACHE_SIZE ) || propertyName.equals( GraphFig.SHARD_CACHE_TIMEOUT ) ) {
                     updateCache();
                 }
             }
@@ -122,23 +122,26 @@ public class NodeShardCacheImpl implements NodeShardCache {
     private void updateCache() {
 
         this.graphs = CacheBuilder.newBuilder().maximumSize( graphFig.getShardCacheSize() )
-                                  .expireAfterWrite( graphFig.getCacheTimeout(), TimeUnit.MILLISECONDS )
+                                  .expireAfterWrite( graphFig.getShardCacheSize(), TimeUnit.MILLISECONDS )
                                   .build( new CacheLoader<CacheKey, CacheEntry>() {
 
 
                                       @Override
                                       public CacheEntry load( final CacheKey key ) throws Exception {
 
-                                          //TODO, we need to put some sort of upper bounds on this, it could possibly
-                                          //get too large
-                                          final Iterator<UUID> edges = nodeShardAllocation
-                                                  .getShards( key.scope, key.id, MAX_UUID, SHARD_PAGE_SIZE, key.types );
-
 
                                           /**
-                                           * Perform an async audit in case we need to allocate a new shard
+                                           * Perform an audit in case we need to allocate a new shard
                                            */
                                           nodeShardAllocation.auditMaxShard( key.scope, key.id, key.types );
+                                             //TODO, we need to put some sort of upper bounds on this, it could possibly
+                                          //get too large
+
+
+
+
+                                          final Iterator<UUID> edges = nodeShardAllocation
+                                                                                          .getShards( key.scope, key.id, MAX_UUID, SHARD_PAGE_SIZE, key.types );
 
                                           return new CacheEntry( edges );
                                       }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f1c61867/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeSeriesCounterSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeSeriesCounterSerialization.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeSeriesCounterSerialization.java
index c706137..f8fa615 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeSeriesCounterSerialization.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeSeriesCounterSerialization.java
@@ -59,18 +59,8 @@ public interface EdgeSeriesCounterSerialization {
      * @param shardId The shard Id to use
      * @param types The types to write to.  Can be edge type, or edgeType+id type
      */
-    public MutationBatch writeMetaDataLog( OrganizationScope scope, Id nodeId, UUID shardId, HyperLogLog log, UUID workerId, String... types );
+    public MutationBatch writeMetaDataLog( OrganizationScope scope, Id nodeId, UUID shardId, long count, String... types );
 
-    /**
-     * return all persistent instance of the hyperlog log
-     * @param scope
-     * @param nodeId
-     * @param shardId
-     * @param types
-     * @return
-     */
-
-    public List<HyperLogLog> getMetadataLog( OrganizationScope scope, Id nodeId, UUID shardId, String... types );
 
     /**
      * Get the most recent rollup of all of the given summations.  If one is not present the optional will be empty
@@ -80,28 +70,8 @@ public interface EdgeSeriesCounterSerialization {
      * @param types
      * @return
      */
-    public Optional<HyperLogLog> getSummationLog(OrganizationScope scope, Id nodeId, UUID shardId, String... types);
-
-    /**
-     * Write the summation log.  Uses the timestamp passed in the column
-     * @param scope The scope to write
-     * @param nodeId The id in the edge
-     * @param shardId The shard Id to use
-     * @param log The log to write
-     * @param creatorId The identifier of this writer
-     * @param types The types to write to.  Can be edge type, or edgeType+id type
-     */
-    public MutationBatch writeSummationLog( OrganizationScope scope, Id nodeId, UUID shardId, HyperLogLog log, UUID workerId,  String... types );
+    public long getCount(OrganizationScope scope, Id nodeId, UUID shardId, String... types);
 
 
-    /**
-     * Remove the slice from the edge meta data from the types.
-     * @param scope
-     * @param nodeId
-     * @param shardId
-     * @param types
-     * @return
-     */
-    public MutationBatch removeEdgeMetadataCount( OrganizationScope scope, Id nodeId, UUID shardId, String... types );
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f1c61867/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeSeriesSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeSeriesSerialization.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeSeriesSerialization.java
index 17c342f..a60e96e 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeSeriesSerialization.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeSeriesSerialization.java
@@ -45,7 +45,7 @@ public interface EdgeSeriesSerialization {
     public MutationBatch writeEdgeMeta(OrganizationScope scope, Id nodeId, UUID slice,  String... types);
 
     /**
-     * Get an iterator of all meta data and types
+     * Get an iterator of all meta data and types.  Returns a range from High to low
      * @param scope The organization scope
      * @param nodeId The id of the node
      * @param start The uuid to start seeking from.  Values <= this value will be returned.

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f1c61867/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/shard/HyperLogLogTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/shard/HyperLogLogTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/shard/HyperLogLogTest.java
index e40bf6c..241fd62 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/shard/HyperLogLogTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/shard/HyperLogLogTest.java
@@ -43,7 +43,7 @@ public class HyperLogLogTest {
 
     private static final UUIDSerializer UUID_SER = UUIDSerializer.get();
 
-    private static final double LOSS = 0.01d;
+    private static final double LOSS = 0.10d;
 
 
     @Test
@@ -133,9 +133,9 @@ public class HyperLogLogTest {
         log.info( "Expected max is {}", max );
 
 
-//        assertTrue( "Min is <= the cardinality", min <= cardinality );
-//
-//        assertTrue( "Cardinality is <= max ", cardinality <= max );
+        assertTrue( "Min is <= the cardinality", min <= cardinality );
+
+        assertTrue( "Cardinality is <= max ", cardinality <= max );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f1c61867/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardAllocationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardAllocationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardAllocationTest.java
new file mode 100644
index 0000000..ee4472d
--- /dev/null
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardAllocationTest.java
@@ -0,0 +1,411 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.graph.impl.shard;
+
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.graph.GraphFig;
+import org.apache.usergrid.persistence.graph.consistency.TimeService;
+import org.apache.usergrid.persistence.graph.impl.Constants;
+import org.apache.usergrid.persistence.graph.serialization.EdgeSeriesCounterSerialization;
+import org.apache.usergrid.persistence.graph.serialization.EdgeSeriesSerialization;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import com.fasterxml.uuid.UUIDComparator;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createId;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class NodeShardAllocationTest {
+
+
+    private GraphFig graphFig;
+
+
+    protected OrganizationScope scope;
+
+
+    @Before
+    public void setup() {
+        scope = mock( OrganizationScope.class );
+
+        Id orgId = mock( Id.class );
+
+        when( orgId.getType() ).thenReturn( "organization" );
+        when( orgId.getUuid() ).thenReturn( UUIDGenerator.newTimeUUID() );
+
+        when( scope.getOrganization() ).thenReturn( orgId );
+
+        graphFig = mock( GraphFig.class );
+
+        when( graphFig.getShardCacheSize() ).thenReturn( 10000l );
+        when( graphFig.getShardSize() ).thenReturn( 20000l );
+        when( graphFig.getShardCacheTimeout()).thenReturn( 30000l );
+    }
+
+
+    @Test
+    public void noShards() {
+        final EdgeSeriesSerialization edgeSeriesSerialization = mock( EdgeSeriesSerialization.class );
+
+        final EdgeSeriesCounterSerialization edgeSeriesCounterSerialization =
+                mock( EdgeSeriesCounterSerialization.class );
+
+
+        final TimeService timeService = mock( TimeService.class );
+
+        final Keyspace keyspace = mock( Keyspace.class );
+
+        final MutationBatch batch = mock( MutationBatch.class );
+
+        when( keyspace.prepareMutationBatch() ).thenReturn( batch );
+
+        NodeShardAllocation approximation =
+                new NodeShardAllocationImpl( edgeSeriesSerialization, edgeSeriesCounterSerialization, timeService,
+                        graphFig, keyspace );
+
+        final Id nodeId = createId( "test" );
+        final String type = "type";
+        final String subType = "subType";
+
+        /**
+         * Mock up returning an empty iterator, our audit shouldn't create a new shard
+         */
+        when( edgeSeriesSerialization
+                .getEdgeMetaData( same( scope ), same( nodeId ), any( UUID.class ), eq( 1 ), same( type ),
+                        same( subType ) ) ).thenReturn( Collections.<UUID>emptyList().iterator() );
+
+        final boolean result = approximation.auditMaxShard( scope, nodeId, type, subType );
+
+        assertFalse( "No shard allocated", result );
+    }
+
+
+    @Test
+    public void existingFutureShard() {
+        final EdgeSeriesSerialization edgeSeriesSerialization = mock( EdgeSeriesSerialization.class );
+
+        final EdgeSeriesCounterSerialization edgeSeriesCounterSerialization =
+                mock( EdgeSeriesCounterSerialization.class );
+
+
+        final TimeService timeService = mock( TimeService.class );
+
+        final Keyspace keyspace = mock( Keyspace.class );
+
+
+        final MutationBatch batch = mock( MutationBatch.class );
+
+        when( keyspace.prepareMutationBatch() ).thenReturn( batch );
+
+
+        NodeShardAllocation approximation =
+                new NodeShardAllocationImpl( edgeSeriesSerialization, edgeSeriesCounterSerialization, timeService,
+                        graphFig, keyspace );
+
+        final Id nodeId = createId( "test" );
+        final String type = "type";
+        final String subType = "subType";
+
+
+        final long timeservicetime = System.currentTimeMillis();
+
+        when( timeService.getCurrentTime() ).thenReturn( timeservicetime );
+
+        final UUID futureShard = UUIDGenerator.newTimeUUID( timeservicetime + graphFig.getShardCacheTimeout() * 2 );
+
+        /**
+         * Mock up returning a min shard, and a future shard
+         */
+        when( edgeSeriesSerialization
+                .getEdgeMetaData( same( scope ), same( nodeId ), any( UUID.class ), eq( 1 ), same( type ),
+                        same( subType ) ) ).thenReturn( Arrays.asList( futureShard ).iterator() );
+
+        final boolean result = approximation.auditMaxShard( scope, nodeId, type, subType );
+
+        assertFalse( "No shard allocated", result );
+    }
+
+
+    @Test
+    public void lowCountFutureShard() {
+        final EdgeSeriesSerialization edgeSeriesSerialization = mock( EdgeSeriesSerialization.class );
+
+        final EdgeSeriesCounterSerialization edgeSeriesCounterSerialization =
+                mock( EdgeSeriesCounterSerialization.class );
+
+
+        final TimeService timeService = mock( TimeService.class );
+
+        final Keyspace keyspace = mock( Keyspace.class );
+
+        final MutationBatch batch = mock( MutationBatch.class );
+
+        when( keyspace.prepareMutationBatch() ).thenReturn( batch );
+
+
+        NodeShardAllocation approximation =
+                new NodeShardAllocationImpl( edgeSeriesSerialization, edgeSeriesCounterSerialization, timeService,
+                        graphFig, keyspace );
+
+        final Id nodeId = createId( "test" );
+        final String type = "type";
+        final String subType = "subType";
+
+
+        final long timeservicetime = System.currentTimeMillis();
+
+        when( timeService.getCurrentTime() ).thenReturn( timeservicetime );
+
+
+        /**
+         * Mock up returning a min shard, and a future shard
+         */
+        when( edgeSeriesSerialization
+                .getEdgeMetaData( same( scope ), same( nodeId ), any( UUID.class ), eq( 1 ), same( type ),
+                        same( subType ) ) ).thenReturn( Arrays.asList( Constants.MIN_UUID ).iterator() );
+
+
+        //return a shard size < our max by 1
+
+        final long count = graphFig.getShardSize() - 1;
+
+        when( edgeSeriesCounterSerialization
+                .getCount( same( scope ), same( nodeId ), eq( Constants.MIN_UUID ), same( type ), same( subType ) ) )
+                .thenReturn( count );
+
+        final boolean result = approximation.auditMaxShard( scope, nodeId, type, subType );
+
+        assertFalse( "Shard allocated", result );
+    }
+
+
+    @Test
+    public void equalCountFutureShard() {
+        final EdgeSeriesSerialization edgeSeriesSerialization = mock( EdgeSeriesSerialization.class );
+
+        final EdgeSeriesCounterSerialization edgeSeriesCounterSerialization =
+                mock( EdgeSeriesCounterSerialization.class );
+
+
+        final TimeService timeService = mock( TimeService.class );
+
+        final Keyspace keyspace = mock( Keyspace.class );
+
+        final MutationBatch batch = mock(MutationBatch.class);
+
+        when(keyspace.prepareMutationBatch()).thenReturn( batch );
+
+
+        NodeShardAllocation approximation =
+                new NodeShardAllocationImpl( edgeSeriesSerialization, edgeSeriesCounterSerialization, timeService,
+                        graphFig, keyspace );
+
+        final Id nodeId = createId( "test" );
+        final String type = "type";
+        final String subType = "subType";
+
+
+        final long timeservicetime = System.currentTimeMillis();
+
+        when( timeService.getCurrentTime() ).thenReturn( timeservicetime );
+
+
+        /**
+         * Mock up returning a min shard
+         */
+        when( edgeSeriesSerialization
+                .getEdgeMetaData( same( scope ), same( nodeId ), any( UUID.class ), eq( 1 ), same( type ),
+                        same( subType ) ) ).thenReturn( Arrays.asList( Constants.MIN_UUID ).iterator() );
+
+
+        final long shardCount = graphFig.getShardSize();
+
+        //return a shard size equal to our max
+        when( edgeSeriesCounterSerialization
+                .getCount( same( scope ), same( nodeId ), eq( Constants.MIN_UUID ), same( type ), same( subType ) ) )
+                .thenReturn( shardCount );
+
+        ArgumentCaptor<UUID> newUUIDValue = ArgumentCaptor.forClass( UUID.class );
+
+
+        //mock up our mutation
+        when( edgeSeriesSerialization
+                .writeEdgeMeta( same( scope ), same( nodeId ), newUUIDValue.capture(), same( type ), same( subType ) ) )
+                .thenReturn( mock( MutationBatch.class ) );
+
+
+        final boolean result = approximation.auditMaxShard( scope, nodeId, type, subType );
+
+        assertTrue( "Shard allocated", result );
+
+        //check our new allocated UUID
+
+        final long expectedUUIDTime = timeservicetime + 2 * graphFig.getShardCacheTimeout();
+
+        UUID expectedUUID = UUIDGenerator.newTimeUUID( expectedUUIDTime );
+
+        assertEquals( "Expected UUID at 2x timeout generated", expectedUUID, newUUIDValue.getValue() );
+    }
+
+
+
+
+    @Test
+    public void futureCountShardCleanup() {
+        final EdgeSeriesSerialization edgeSeriesSerialization = mock( EdgeSeriesSerialization.class );
+
+        final EdgeSeriesCounterSerialization edgeSeriesCounterSerialization =
+                mock( EdgeSeriesCounterSerialization.class );
+
+
+        final TimeService timeService = mock( TimeService.class );
+
+        final Keyspace keyspace = mock( Keyspace.class );
+
+        final MutationBatch batch = mock(MutationBatch.class);
+
+        when(keyspace.prepareMutationBatch()).thenReturn( batch );
+
+
+        NodeShardAllocation approximation =
+                new NodeShardAllocationImpl( edgeSeriesSerialization, edgeSeriesCounterSerialization, timeService,
+                        graphFig, keyspace );
+
+        final Id nodeId = createId( "test" );
+        final String type = "type";
+        final String subType = "subType";
+
+
+        /**
+         * Use the time service to generate UUIDS
+         */
+        final long timeservicetime = System.currentTimeMillis();
+
+
+        when( timeService.getCurrentTime() ).thenReturn( timeservicetime );
+
+        assertTrue("Shard cache mocked", graphFig.getShardCacheTimeout() > 0);
+
+
+        /**
+         * Simulates clock drift when 2 nodes create future shards near one another
+         */
+        final long futureTime = timeService.getCurrentTime()  + 2 * graphFig.getShardCacheTimeout();
+
+
+        assertTrue("Future time is actually in the future", futureTime > timeService.getCurrentTime());
+
+
+
+
+        UUID futureUUID1 = UUIDGenerator.newTimeUUID(futureTime);
+
+        UUID futureUUID2 = UUIDGenerator.newTimeUUID(futureTime+1);
+
+        UUID futureUUID3 = UUIDGenerator.newTimeUUID(futureTime+2);
+
+
+        UUID now = UUIDGenerator.newTimeUUID( timeService.getCurrentTime() );
+
+
+        //verify all future IDS are greater than "now" if they're not, then there's something wrong with the
+        //UUID Generation, which is crucial to this functionality
+
+        assertTrue( UUIDComparator.staticCompare( futureUUID1, now ) > 0);
+
+        assertTrue( UUIDComparator.staticCompare( futureUUID2, now ) > 0);
+
+        assertTrue( UUIDComparator.staticCompare( futureUUID3, now ) > 0);
+
+
+        final int pageSize = 100;
+
+        /**
+         * Mock up returning a min shard
+         */
+        when( edgeSeriesSerialization
+                .getEdgeMetaData( same( scope ), same( nodeId ), any( UUID.class ), eq( pageSize ), same( type ),
+                        same( subType ) ) ).thenReturn( Arrays.asList(futureUUID3, futureUUID2, futureUUID1, Constants.MIN_UUID ).iterator() );
+
+
+
+        ArgumentCaptor<UUID> newUUIDValue = ArgumentCaptor.forClass( UUID.class );
+
+
+
+
+        //mock up our mutation
+        when( edgeSeriesSerialization
+                .removeEdgeMeta( same( scope ), same( nodeId ), newUUIDValue.capture(), same( type ), same( subType ) ) )
+                .thenReturn( mock( MutationBatch.class ) );
+
+
+        final Iterator<UUID>
+                result = approximation.getShards( scope, nodeId, Constants.MAX_UUID, pageSize, type, subType );
+
+
+        assertTrue("Shards present", result.hasNext());
+
+        assertEquals("Only single next shard returned", futureUUID1,  result.next());
+
+        assertTrue("Shards present", result.hasNext());
+
+        assertEquals("Previous shard present", Constants.MIN_UUID, result.next());
+
+        assertFalse("No shards left", result.hasNext());
+
+        /**
+         * Now we need to verify that both our mutations have been added
+         */
+
+        List<UUID> values = newUUIDValue.getAllValues();
+
+        assertEquals("2 values removed", values.size());
+
+        assertEquals("Deleted Max Future", futureUUID3, values.get( 0 ));
+        assertEquals("Deleted Next Future", futureUUID2, values.get( 1 ));
+
+
+
+
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f1c61867/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardApproximationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardApproximationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardApproximationTest.java
new file mode 100644
index 0000000..1b7725f
--- /dev/null
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardApproximationTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.graph.impl.shard;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.graph.GraphFig;
+import org.apache.usergrid.persistence.graph.serialization.EdgeSeriesCounterSerialization;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createId;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class NodeShardApproximationTest {
+
+
+    private GraphFig graphFig;
+
+
+    protected OrganizationScope scope;
+
+
+    @Before
+    public void setup() {
+        scope = mock( OrganizationScope.class );
+
+        Id orgId = mock( Id.class );
+
+        when( orgId.getType() ).thenReturn( "organization" );
+        when( orgId.getUuid() ).thenReturn( UUIDGenerator.newTimeUUID() );
+
+        when( scope.getOrganization() ).thenReturn( orgId );
+
+        graphFig = mock( GraphFig.class );
+
+        when( graphFig.getShardCacheSize() ).thenReturn( 10000l );
+        when(graphFig.getShardSize()).thenReturn( 250000l );
+    }
+
+
+    @Test
+    public void testSingleShard() {
+
+        EdgeSeriesCounterSerialization ser = mock( EdgeSeriesCounterSerialization.class );
+
+        NodeShardApproximation approximation = new NodeShardApproximationImpl( graphFig );
+
+
+        final Id id = createId( "test" );
+        final UUID shardId = UUIDGenerator.newTimeUUID();
+        final String type = "type";
+        final String type2 = "subType";
+
+        long count = approximation.getCount( scope, id, shardId, type, type2 );
+
+        assertEquals( 0, count );
+    }
+
+
+    @Test
+    public void testMultipleShard() throws ExecutionException, InterruptedException {
+
+
+        final NodeShardApproximation approximation = new NodeShardApproximationImpl( graphFig );
+
+
+        final int increments = 1000000;
+        final int workers = 100;
+
+        final Id id = createId( "test" );
+        final String type = "type";
+        final String type2 = "subType";
+
+        ExecutorService executor = Executors.newFixedThreadPool( workers );
+
+        List<Future<Long>> futures = new ArrayList<>( workers );
+
+        for ( int i = 0; i < workers; i++ ) {
+
+            final Future<Long> future = executor.submit( new Callable<Long>() {
+                @Override
+                public Long call() throws Exception {
+
+                    final UUID shardId = UUIDGenerator.newTimeUUID();
+
+
+                    long count = approximation.getCount( scope, id, shardId, type, type2 );
+
+                    assertEquals( 0, count );
+
+                    for ( int i = 0; i < increments; i++ ) {
+                        approximation.increment( scope, id, shardId, 1, type, type2 );
+                    }
+
+                    return approximation.getCount( scope, id, shardId, type, type2 );
+                }
+            } );
+
+            futures.add( future );
+        }
+
+
+        for ( Future<Long> future : futures ) {
+            final long value = future.get().longValue();
+
+            assertEquals( increments, value );
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f1c61867/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardCacheTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardCacheTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardCacheTest.java
index b0acd5a..b56711b 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardCacheTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardCacheTest.java
@@ -250,8 +250,8 @@ public class NodeShardCacheTest {
 
     private GraphFig getFigMock() {
         final GraphFig graphFig = mock( GraphFig.class );
-        when( graphFig.getShardCacheSize() ).thenReturn( 1000 );
-        when( graphFig.getCacheTimeout() ).thenReturn( 30000l );
+        when( graphFig.getShardCacheSize() ).thenReturn( 1000l );
+        when( graphFig.getShardCacheTimeout() ).thenReturn( 30000l );
 
         return graphFig;
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f1c61867/stack/corepersistence/model/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/model/pom.xml b/stack/corepersistence/model/pom.xml
index 42d64ec..c671128 100644
--- a/stack/corepersistence/model/pom.xml
+++ b/stack/corepersistence/model/pom.xml
@@ -41,6 +41,14 @@
             <scope>test</scope>
         </dependency>
 
+
+      <dependency>
+          <groupId>org.apache.commons</groupId>
+          <artifactId>commons-lang3</artifactId>
+          <version>${commons.lang.version}</version>
+          <scope>test</scope>
+      </dependency>
+
     </dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f1c61867/stack/corepersistence/model/src/main/java/org/apache/usergrid/persistence/model/util/UUIDGenerator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/model/src/main/java/org/apache/usergrid/persistence/model/util/UUIDGenerator.java b/stack/corepersistence/model/src/main/java/org/apache/usergrid/persistence/model/util/UUIDGenerator.java
index f2c4fa8..af09915 100644
--- a/stack/corepersistence/model/src/main/java/org/apache/usergrid/persistence/model/util/UUIDGenerator.java
+++ b/stack/corepersistence/model/src/main/java/org/apache/usergrid/persistence/model/util/UUIDGenerator.java
@@ -1,93 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package org.apache.usergrid.persistence.model.util;
 
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Random;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
 
 import com.fasterxml.uuid.EthernetAddress;
 import com.fasterxml.uuid.TimestampSynchronizer;
 import com.fasterxml.uuid.UUIDTimer;
 import com.fasterxml.uuid.impl.TimeBasedGenerator;
 
+import static com.fasterxml.uuid.impl.UUIDUtil.BYTE_OFFSET_CLOCK_HI;
+import static com.fasterxml.uuid.impl.UUIDUtil.BYTE_OFFSET_CLOCK_LO;
+import static com.fasterxml.uuid.impl.UUIDUtil.BYTE_OFFSET_CLOCK_MID;
+import static com.fasterxml.uuid.impl.UUIDUtil.BYTE_OFFSET_CLOCK_SEQUENCE;
+
 
 /**
- * TODO replace this with the Astyanax generator libs
- * @author: tnine
- *
+ * TODO replace this with the existing usergridUUID utils, we seem to be getting inconsistent behavior from it
  */
 public class UUIDGenerator {
 
 
-    private static final TimestampSynchronizer synchronize = new TimestampSynchronizer() {
+    private static final int[] MICROS = new int[1000];
+
+
+        static {
+            for ( int x = 0; x < 1000; x++ ) {
+                MICROS[x] = x * 10;
+            }
+        }
+
+
+        private static ReentrantLock tsLock = new ReentrantLock( true );
+
+        public static final UUID MIN_TIME_UUID = UUID.fromString( "00000000-0000-1000-8000-000000000000" );
+
+        public static final UUID MAX_TIME_UUID = UUID.fromString( "ffffffff-ffff-1fff-bfff-ffffffffffff" );
+
+        public static final UUID ZERO_UUID = new UUID( 0, 0 );
+
+        private static long timestampMillisNow = System.currentTimeMillis();
+
+        private static AtomicInteger currentMicrosPoint = new AtomicInteger( 0 );
+        private static AtomicInteger customMicrosPointer = new AtomicInteger( 0 );
 
-        /**
-         * Pointer to the last value we returned
-         */
-        private long last = 0;
 
         /**
-         * The number of ticks that can be used in the millisecond.  In a time UUID a tick is divided into 1/10000 of
-         * a millisecond
-         *
+         * Return the "next" UUID in micro second resolution. <b>WARNING</b>: this is designed to return the next unique
+         * timestamped UUID for this JVM. Depending on velocity of the call, this method may block internally to insure that
+         * "now" is kept in sync with the UUIDs being generated by this call.
+         * <p/>
+         * In other words, we will intentionally burn CPU insuring that this method is not executed more than 10k -1 times
+         * per millisecond and guarantee that those microseconds held within are sequential.
+         * <p/>
+         * If we did not do this, you would get <b>timestamp collision</b> even though the UUIDs will technically be
+         * 'unique.'
          */
-        private AtomicInteger ticks = new AtomicInteger();
+        public static UUID newTimeUUID() {
+            // get & inc counter, but roll on 1k (because we divide by 10 on retrieval)
+            // if count + currentMicro > 1k, block and roll
+            tsLock.lock();
+            long ts = System.currentTimeMillis();
+            if ( ts > timestampMillisNow ) {
+                timestampMillisNow = ts;
+                currentMicrosPoint.set( 0 );
+            }
+            int pointer = currentMicrosPoint.getAndIncrement();
+            try {
+                if ( pointer > 990 ) {
+                    TimeUnit.MILLISECONDS.sleep( 1L );
+                }
+            }
+            catch ( Exception ex ) {
+                ex.printStackTrace();
+            }
+            finally {
+                tsLock.unlock();
+            }
+            return newTimeUUID( ts, MICROS[pointer] );
+        }
 
 
-        @Override
-        protected long initialize() throws IOException {
+        private static final long KCLOCK_OFFSET = 0x01b21dd213814000L;
+        private static final long KCLOCK_MULTIPLIER_L = 10000L;
 
-            last = System.currentTimeMillis();
-            return last;
-        }
+        private static final Random CLOCK_SEQ_RANDOM = new Random();
 
 
-        @Override
-        protected void deactivate() throws IOException {
-            //no op
+        // 14 bits of randomness
+        private static int getRandomClockSequence() {
+            return CLOCK_SEQ_RANDOM.nextInt() & 0x3FFF;
         }
 
 
-        @Override
-        protected long update( long now ) throws IOException {
-            /**
-             * Our timestamp is greater just use that and reset last
-             */
-            if ( now > last ) {
-                last = now;
-                ticks.set( 0 );
-                return last;
-            }
+        private static void setTimestamp( long timestamp, byte[] uuidBytes, int clockSeq, int timeOffset ) {
+
+            timestamp *= KCLOCK_MULTIPLIER_L;
+            timestamp += KCLOCK_OFFSET;
+            timestamp += timeOffset;
 
-            //we have the same value (since now should always be increasing) increment a tick
-            return last + ticks.incrementAndGet();
+            // Set random clock sequence
+            uuidBytes[BYTE_OFFSET_CLOCK_SEQUENCE] = ( byte ) ( clockSeq >> 8 );
+            uuidBytes[BYTE_OFFSET_CLOCK_SEQUENCE + 1] = ( byte ) clockSeq;
+
+            // Set variant
+            uuidBytes[BYTE_OFFSET_CLOCK_SEQUENCE] &= 0x3F;
+            uuidBytes[BYTE_OFFSET_CLOCK_SEQUENCE] |= 0x80;
+            setTime( uuidBytes, timestamp );
         }
-    };
 
 
-    private static final Random random = new Random();
-    private static final UUIDTimer timer;
+        @SuppressWarnings("all")
+        private static void setTime( byte[] uuidBytes, long timestamp ) {
+
+            // Time fields aren't nicely split across the UUID, so can't just
+            // linearly dump the stamp:
+            int clockHi = ( int ) ( timestamp >>> 32 );
+            int clockLo = ( int ) timestamp;
+
+            uuidBytes[BYTE_OFFSET_CLOCK_HI] = ( byte ) ( clockHi >>> 24 );
+            uuidBytes[BYTE_OFFSET_CLOCK_HI + 1] = ( byte ) ( clockHi >>> 16 );
+            uuidBytes[BYTE_OFFSET_CLOCK_MID] = ( byte ) ( clockHi >>> 8 );
+            uuidBytes[BYTE_OFFSET_CLOCK_MID + 1] = ( byte ) clockHi;
 
+            uuidBytes[BYTE_OFFSET_CLOCK_LO] = ( byte ) ( clockLo >>> 24 );
+            uuidBytes[BYTE_OFFSET_CLOCK_LO + 1] = ( byte ) ( clockLo >>> 16 );
+            uuidBytes[BYTE_OFFSET_CLOCK_LO + 2] = ( byte ) ( clockLo >>> 8 );
+            uuidBytes[BYTE_OFFSET_CLOCK_LO + 3] = ( byte ) clockLo;
 
-    /**
-     * Lame, but required
-     */
-    static {
-        try {
-            timer = new UUIDTimer( random, synchronize );
+            // Set version
+            uuidBytes[BYTE_OFFSET_CLOCK_HI] &= 0x0F;
+            uuidBytes[BYTE_OFFSET_CLOCK_HI] |= 0x10;
         }
-        catch ( IOException e ) {
-            throw new RuntimeException( "Couldn't intialize timer", e );
+
+
+        /**
+         * Generate a timeuuid with the given timestamp in milliseconds and the time offset. Useful when you need to
+         * generate sequential UUIDs for the same period in time. I.E
+         * <p/>
+         * newTimeUUID(1000, 0) <br/> newTimeUUID(1000, 1) <br /> newTimeUUID(1000, 2) <br />
+         * <p/>
+         * etc.
+         * <p/>
+         * Only use this method if you are absolutely sure you need it. When it doubt use the method without the timestamp
+         * offset
+         *
+         * @param ts The timestamp in milliseconds
+         * @param timeoffset The offset, which should always be <= 10000. If you go beyond this range, the millisecond will
+         * be incremented since this is beyond the possible values when converting from millis to 1/10 microseconds stored
+         * in the time uuid.
+         */
+        public static UUID newTimeUUID( long ts, int timeoffset ) {
+            if ( ts == 0 ) {
+                return newTimeUUID();
+            }
+
+            byte[] uuidBytes = new byte[16];
+            // 47 bits of randomness
+            EthernetAddress eth = EthernetAddress.constructMulticastAddress();
+            eth.toByteArray( uuidBytes, 10 );
+            setTimestamp( ts, uuidBytes, getRandomClockSequence(), timeoffset );
+
+            return UUID.nameUUIDFromBytes( uuidBytes );
         }
-    }
 
 
-    private static final TimeBasedGenerator generator = new TimeBasedGenerator( EthernetAddress.fromInterface(), timer );
+        /**
+         * Generate a new UUID with the given time stamp in milliseconds. This method guarantees that subsequent calls will
+         * be of increasing value chronologically. If a large number of subsequent calls are made to this method (>1000)
+         * with the same timestamp, you will have non-unique temporal values stored in your UUID.
+         */
+        public static UUID newTimeUUID( long ts ) {
+            tsLock.lock();
+            int pointer = customMicrosPointer.getAndIncrement();
+            try {
+                if ( pointer > 990 ) {
+                    customMicrosPointer.set( 0 );
+                }
+            }
+            finally {
+                tsLock.unlock();
+            }
+            return newTimeUUID( ts, MICROS[pointer] );
+        }
 
 
-    /** Create a new time uuid */
-    public static UUID newTimeUUID() {
-        return generator.generate();
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f1c61867/stack/corepersistence/model/src/main/java/org/apache/usergrid/persistence/model/util/UUIDUtils.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/model/src/main/java/org/apache/usergrid/persistence/model/util/UUIDUtils.java b/stack/corepersistence/model/src/main/java/org/apache/usergrid/persistence/model/util/UUIDUtils.java
new file mode 100644
index 0000000..1d76e9c
--- /dev/null
+++ b/stack/corepersistence/model/src/main/java/org/apache/usergrid/persistence/model/util/UUIDUtils.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.model.util;
+
+
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+
+import com.fasterxml.uuid.EthernetAddress;
+
+import static com.fasterxml.uuid.impl.UUIDUtil.BYTE_OFFSET_CLOCK_HI;
+import static com.fasterxml.uuid.impl.UUIDUtil.BYTE_OFFSET_CLOCK_LO;
+import static com.fasterxml.uuid.impl.UUIDUtil.BYTE_OFFSET_CLOCK_MID;
+import static com.fasterxml.uuid.impl.UUIDUtil.BYTE_OFFSET_CLOCK_SEQUENCE;
+
+
+public class UUIDUtils {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f1c61867/stack/corepersistence/model/src/test/java/org/apache/usergrid/persistence/model/util/UUIDGeneratorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/model/src/test/java/org/apache/usergrid/persistence/model/util/UUIDGeneratorTest.java b/stack/corepersistence/model/src/test/java/org/apache/usergrid/persistence/model/util/UUIDGeneratorTest.java
index ef1a9e9..738b788 100644
--- a/stack/corepersistence/model/src/test/java/org/apache/usergrid/persistence/model/util/UUIDGeneratorTest.java
+++ b/stack/corepersistence/model/src/test/java/org/apache/usergrid/persistence/model/util/UUIDGeneratorTest.java
@@ -116,4 +116,21 @@ public class UUIDGeneratorTest {
             return null;
         }
     }
+
+    @Test
+    public void testUUIDOrderingWithTimestamp(){
+        long first = 10000l;
+        long second = first+1;
+
+        //ensure the values are reproducible
+        UUID firstUUID = UUIDGenerator.newTimeUUID( first );
+        UUID firstUUIDx2 = UUIDGenerator.newTimeUUID( first );
+        UUID secondUUID = UUIDGenerator.newTimeUUID( second );
+
+        assertTrue(UUIDComparator.staticCompare( firstUUID, secondUUID ) < 0);
+        assertTrue(UUIDComparator.staticCompare( secondUUID, firstUUID ) > 0);
+
+        assertTrue(UUIDComparator.staticCompare( firstUUIDx2, firstUUID ) == 0);
+
+    }
 }