You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2014/08/12 19:00:44 UTC

[3/9] git commit: Updated tests and shard logic to be more efficient. O(1) adds vs O( log n )

Updated tests and shard logic to be more efficient.  O(1) adds vs O( log n )


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/e9fa368e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/e9fa368e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/e9fa368e

Branch: refs/heads/USERGRID-188
Commit: e9fa368ef22140254b350e856d71a5fe1c42ef92
Parents: aae3902
Author: Todd Nine <to...@apache.org>
Authored: Tue Aug 5 13:01:04 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Tue Aug 5 13:01:04 2014 -0600

----------------------------------------------------------------------
 .../persistence/graph/guice/GraphModule.java    |    4 +-
 .../graph/serialization/impl/shard/Shard.java   |    6 +-
 .../impl/shard/ShardEntryGroup.java             |   92 +-
 .../shard/impl/ShardEntryGroupIterator.java     |   30 +-
 .../impl/ShardedEdgeSerializationImpl.java      |  138 +--
 .../graph/ComittedGraphManagerIT.java           |  138 ---
 .../graph/CommittedGraphManagerIT.java          |  138 +++
 .../impl/shard/EdgeShardSerializationTest.java  |   48 +-
 .../impl/shard/NodeShardAllocationTest.java     | 1103 +++++++++---------
 .../impl/shard/ShardEntryGroupTest.java         |  198 +++-
 .../shard/impl/ShardEntryGroupIteratorTest.java |  260 +++++
 11 files changed, 1298 insertions(+), 857 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9fa368e/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
index 3089889..ca7c270 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
@@ -151,13 +151,15 @@ public class GraphModule extends AbstractModule {
     public EdgeSerialization storageSerialization( final NodeShardCache cache, final Keyspace keyspace,
                                                    final CassandraConfig cassandraConfig, final GraphFig graphFig,
                                                    final NodeShardApproximation shardApproximation,
+                                                   final TimeService timeService,
                                                    @StorageEdgeSerialization
                                                    final EdgeColumnFamilies edgeColumnFamilies ) {
 
         final EdgeShardStrategy sizeBasedStrategy = new SizebasedEdgeShardStrategy( cache, shardApproximation );
 
 
-        final ShardedEdgeSerialization serialization = new ShardedEdgeSerializationImpl(keyspace, cassandraConfig, graphFig, sizeBasedStrategy);
+        final ShardedEdgeSerialization serialization = new ShardedEdgeSerializationImpl(keyspace, cassandraConfig, graphFig, sizeBasedStrategy,
+                timeService );
 
 
         final EdgeSerializationImpl edgeSerialization =

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9fa368e/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java
index 4b58224..38fe51c 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java
@@ -103,12 +103,15 @@ public class Shard implements Comparable<Shard> {
         if ( this == o ) {
             return true;
         }
-        if ( !( o instanceof Shard ) ) {
+        if ( o == null || getClass() != o.getClass() ) {
             return false;
         }
 
         final Shard shard = ( Shard ) o;
 
+        if ( compacted != shard.compacted ) {
+            return false;
+        }
         if ( createdTime != shard.createdTime ) {
             return false;
         }
@@ -124,6 +127,7 @@ public class Shard implements Comparable<Shard> {
     public int hashCode() {
         int result = ( int ) ( shardIndex ^ ( shardIndex >>> 32 ) );
         result = 31 * result + ( int ) ( createdTime ^ ( createdTime >>> 32 ) );
+        result = 31 * result + ( compacted ? 1 : 0 );
         return result;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9fa368e/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
index e39f86b..bea428b 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
@@ -19,11 +19,15 @@
 package org.apache.usergrid.persistence.graph.serialization.impl.shard;
 
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.List;
 import java.util.TreeSet;
 
+import com.google.common.base.Preconditions;
+
 
 /**
  * There are cases where we need to read or write to more than 1 shard.  This object encapsulates a set of shards that
@@ -35,7 +39,7 @@ import java.util.TreeSet;
 public class ShardEntryGroup {
 
 
-    private TreeSet<Shard> shards;
+    private List<Shard> shards;
 
     private final long delta;
 
@@ -48,8 +52,9 @@ public class ShardEntryGroup {
      * The max delta we accept in milliseconds for create time to be considered a member of this group
      */
     public ShardEntryGroup( final long delta ) {
+        Preconditions.checkArgument(delta > 0, "delta must be greater than 0");
         this.delta = delta;
-        this.shards = new TreeSet<>();
+        this.shards = new ArrayList<>();
         this.maxCreatedTime = 0;
     }
 
@@ -66,41 +71,21 @@ public class ShardEntryGroup {
      */
     public boolean addShard( final Shard shard ) {
 
-        //shards can be ar
-
-        //compare the time and see if it falls withing any of the elements based on their timestamp
-        //        final long shardCreateTime = shard.getCreatedTime();
-
-        //        final Long lessThanKey = shards.floorKey( shardCreateTime );
-        //
-        //        final Long greaterThanKey = shards.ceilingKey( shardCreateTime );
-        //
-        //        //first into the set
-        //        if ( lessThanKey == null && greaterThanKey == null ) {
-        //            addShardInternal( shard );
-        //            return true;
-        //        }
-        //
-        //        if ( lessThanKey != null && shardCreateTime - lessThanKey < delta ) {
-        //            addShardInternal( shard );
-        //            return true;
-        //        }
-        //
-        //
-        //        if ( greaterThanKey != null && greaterThanKey - shardCreateTime < delta ) {
-        //            addShardInternal( shard );
-        //
-        //            return true;
-        //        }
-
-        if ( shards.size() == 0 ) {
+        Preconditions.checkNotNull( "shard cannot be null", shard );
+
+        final int size = shards.size();
+
+        if ( size  == 0 ) {
             addShardInternal( shard );
             return true;
         }
 
+        final Shard minShard = shards.get( size -1 );
+
+        Preconditions.checkArgument(minShard.compareTo(shard) > 0, "shard must be less than the current max");
 
         //shard is not compacted, or it's predecessor isn't, we should include it in this group
-        if ( !shard.isCompacted() || !shards.first().isCompacted() ) {
+        if ( !minShard.isCompacted() ) {
             addShardInternal( shard );
             return true;
         }
@@ -140,7 +125,7 @@ public class ShardEntryGroup {
          * The shards in this set can be combined, we should only write to the compaction target to avoid
          * adding data to other shards
          */
-        if ( shouldCompact( currentTime ) ) {
+        if ( !isTooSmallToCompact() && shouldCompact( currentTime ) ) {
             return Collections.singleton( getCompactionTarget() );
         }
 
@@ -160,36 +145,57 @@ public class ShardEntryGroup {
             return compactionTarget;
         }
 
+
         //we have < 2 shards, we can't compact
-        if ( shards.size() < 2 ) {
+        if (isTooSmallToCompact()) {
             return null;
         }
 
 
-        Iterator<Shard> descendingIterator = shards.iterator();
 
-        //if we don't have a next, or our "lowest" shard can't be compacted we have no nearest neighbor
-        //to use as a bookend.  We can't compact.
-        if(!descendingIterator.hasNext() || !descendingIterator.next().isCompacted()){
+        final int lastIndex = shards.size() -1;
+
+        final Shard last = shards.get( lastIndex  );
+
+        //Our oldest isn't compacted. As a result we have no "bookend" to delimit this entry group.  Therefore we can't compact
+        if(!last.isCompacted()){
             return null;
         }
 
+        //Start seeking from the end of our group.  The first shard we encounter that is not compacted is our compaction target
+        //NOTE: This does not mean we can compact, rather it's just an indication that we have a target set.
+        for(int i = lastIndex - 1; i > -1; i --){
+            final Shard compactionCandidate = shards.get( i );
 
 
-        //our next should be able to be compacted.
+            if(!compactionCandidate.isCompacted()){
+                compactionTarget = compactionCandidate;
+                break;
+            }
 
-        if(!descendingIterator.hasNext()){
-            return null;
         }
 
-        //We use this value a lot, cache it
-        compactionTarget = descendingIterator.next();
-
         return compactionTarget;
     }
 
 
     /**
+     * Return the number of entries in this shard group
+     * @return
+     */
+    public int entrySize(){
+        return shards.size();
+    }
+
+    /**
+     * Return true if there are not enough elements in this entry group to consider compaction
+     * @return
+     */
+    private boolean isTooSmallToCompact(){
+        return shards.size() < 2;
+    }
+
+    /**
      * Returns true if the newest created shard is path the currentTime - delta
      *
      * @param currentTime The current system time in milliseconds

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9fa368e/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java
index 8e69be4..89104e8 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java
@@ -24,11 +24,19 @@ public class ShardEntryGroupIterator implements Iterator<ShardEntryGroup> {
 
     /**
      * Create a shard iterator
-     * @param sourceIterator The iterator of all shards.  Order is expected to be by the  shard index from Long.MAX to Long.MIN
+     * @param shardIterator The iterator of all shards.  Order is expected to be by the  shard index from Long.MAX to Long.MIN
      * @param minDelta The minimum delta we allow to consider shards the same group
      */
-    public ShardEntryGroupIterator( final Iterator<Shard> sourceIterator, final long minDelta ) {
-        this.sourceIterator = new PushbackIterator( sourceIterator );
+    public ShardEntryGroupIterator( final Iterator<Shard> shardIterator, final long minDelta ) {
+        this.sourceIterator = new PushbackIterator( shardIterator );
+
+        /**
+         * If we don't have any shards, we need to push our "MIN" shard into the list
+         */
+        if(!sourceIterator.hasNext()){
+            sourceIterator.pushback( new Shard(0, 0, true) );
+        }
+
         this.minDelta = minDelta;
     }
 
@@ -39,7 +47,7 @@ public class ShardEntryGroupIterator implements Iterator<ShardEntryGroup> {
             advance();
         }
 
-        return next == null;
+        return next != null;
     }
 
 
@@ -69,29 +77,29 @@ public class ShardEntryGroupIterator implements Iterator<ShardEntryGroup> {
      */
     private void advance() {
 
-
-        final ShardEntryGroup shardEntryGroup = new ShardEntryGroup( minDelta );
-
-
         /**
          * We loop through until we've exhausted our source, or we have 2 elements, which means
          * they're > min time allocation from one another
          */
         while ( sourceIterator.hasNext() ) {
 
+            if(next == null){
+                next = new ShardEntryGroup( minDelta );
+            }
 
             final Shard shard = sourceIterator.next();
 
 
             //we can't add this one to the entries, it doesn't fit within the delta, allocate a new one and break
-            if ( shardEntryGroup.addShard( shard ) ) {
+            if ( next.addShard( shard ) ) {
                 continue;
             }
 
-            //we can't add this shard to the current group.  Add the group and return.
-            next = shardEntryGroup;
 
             sourceIterator.pushback( shard );
+            break;
         }
+
+
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9fa368e/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
index d2e2ddb..558d3fc 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
@@ -29,6 +29,7 @@ import javax.inject.Inject;
 import org.apache.usergrid.persistence.core.astyanax.CassandraConfig;
 import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
 import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
+import org.apache.usergrid.persistence.core.consistency.TimeService;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.util.ValidationUtils;
 import org.apache.usergrid.persistence.graph.Edge;
@@ -67,22 +68,27 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
     protected final CassandraConfig cassandraConfig;
     protected final GraphFig graphFig;
     protected final EdgeShardStrategy writeEdgeShardStrategy;
+    protected final TimeService timeService;
 
 
     @Inject
     public ShardedEdgeSerializationImpl( final Keyspace keyspace, final CassandraConfig cassandraConfig,
-                                         final GraphFig graphFig, final EdgeShardStrategy writeEdgeShardStrategy ) {
+                                         final GraphFig graphFig, final EdgeShardStrategy writeEdgeShardStrategy,
+                                         final TimeService timeService ) {
+
 
         checkNotNull( "keyspace required", keyspace );
         checkNotNull( "cassandraConfig required", cassandraConfig );
         checkNotNull( "consistencyFig required", graphFig );
         checkNotNull( "writeEdgeShardStrategy required", writeEdgeShardStrategy );
+        checkNotNull( "timeService required", timeService );
 
 
         this.keyspace = keyspace;
         this.cassandraConfig = cassandraConfig;
         this.graphFig = graphFig;
         this.writeEdgeShardStrategy = writeEdgeShardStrategy;
+        this.timeService = timeService;
     }
 
 
@@ -198,6 +204,8 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
          */
 
 
+        final long time = timeService.getCurrentTime();
+
         final DirectedEdge sourceEdge = new DirectedEdge( targetNodeId, timestamp );
 
         final ShardEntryGroup sourceRowKeyShard =
@@ -207,70 +215,70 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
                 columnFamilies.getSourceNodeCfName();
 
 
-//        for ( Shard shard : sourceRowKeyShard.getEntries() ) {
-//
-//            final long shardId = shard.getShardIndex();
-//            final RowKey sourceRowKey = new RowKey( sourceNodeId, type, shardId );
-//            op.writeEdge( sourceCf, sourceRowKey, sourceEdge );
-//            op.countEdge( sourceNodeId, NodeType.SOURCE, shardId, type );
-//        }
-//
-//
-//        final ShardEntryGroup sourceWithTypeRowKeyShard = writeEdgeShardStrategy
-//                .getWriteShards( scope, sourceNodeId, NodeType.SOURCE, timestamp, type, targetNodeType );
-//
-//        final MultiTennantColumnFamily<ApplicationScope, RowKeyType, DirectedEdge> targetCf =
-//                columnFamilies.getSourceNodeTargetTypeCfName();
-//
-//        for ( Shard shard : sourceWithTypeRowKeyShard.getEntries() ) {
-//
-//            final long shardId = shard.getShardIndex();
-//            final RowKeyType sourceRowKeyType = new RowKeyType( sourceNodeId, type, targetNodeId, shardId );
-//
-//            op.writeEdge( targetCf, sourceRowKeyType, sourceEdge );
-//            op.countEdge( sourceNodeId, NodeType.SOURCE, shardId, type, targetNodeType );
-//        }
-//
-//
-//        /**
-//         * write edges from target<-source
-//         */
-//
-//        final DirectedEdge targetEdge = new DirectedEdge( sourceNodeId, timestamp );
-//
-//
-//        final ShardEntryGroup targetRowKeyShard =
-//                writeEdgeShardStrategy.getWriteShards( scope, targetNodeId, NodeType.TARGET, timestamp, type );
-//
-//        final MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> sourceByTargetCf =
-//                columnFamilies.getTargetNodeCfName();
-//
-//        for ( Shard shard : targetRowKeyShard.getEntries() ) {
-//            final long shardId = shard.getShardIndex();
-//            final RowKey targetRowKey = new RowKey( targetNodeId, type, shardId );
-//
-//            op.writeEdge( sourceByTargetCf, targetRowKey, targetEdge );
-//            op.countEdge( targetNodeId, NodeType.TARGET, shardId, type );
-//        }
-//
-//
-//        final ShardEntryGroup targetWithTypeRowKeyShard = writeEdgeShardStrategy
-//                .getWriteShards( scope, targetNodeId, NodeType.TARGET, timestamp, type, sourceNodeType );
-//
-//        final MultiTennantColumnFamily<ApplicationScope, RowKeyType, DirectedEdge> targetBySourceCf =
-//                columnFamilies.getTargetNodeSourceTypeCfName();
-//
-//
-//        for ( Shard shard : targetWithTypeRowKeyShard.getEntries() ) {
-//
-//            final long shardId = shard.getShardIndex();
-//
-//            final RowKeyType targetRowKeyType = new RowKeyType( targetNodeId, type, sourceNodeId, shardId );
-//
-//
-//            op.writeEdge( targetBySourceCf, targetRowKeyType, targetEdge );
-//            op.countEdge( targetNodeId, NodeType.TARGET, shardId, type, sourceNodeType );
-//        }
+        for ( Shard shard : sourceRowKeyShard.getWriteShards(time) ) {
+
+            final long shardId = shard.getShardIndex();
+            final RowKey sourceRowKey = new RowKey( sourceNodeId, type, shardId );
+            op.writeEdge( sourceCf, sourceRowKey, sourceEdge );
+            op.countEdge( sourceNodeId, NodeType.SOURCE, shardId, type );
+        }
+
+
+        final ShardEntryGroup sourceWithTypeRowKeyShard = writeEdgeShardStrategy
+                .getWriteShards( scope, sourceNodeId, NodeType.SOURCE, timestamp, type, targetNodeType );
+
+        final MultiTennantColumnFamily<ApplicationScope, RowKeyType, DirectedEdge> targetCf =
+                columnFamilies.getSourceNodeTargetTypeCfName();
+
+        for ( Shard shard : sourceWithTypeRowKeyShard.getWriteShards(time) ) {
+
+            final long shardId = shard.getShardIndex();
+            final RowKeyType sourceRowKeyType = new RowKeyType( sourceNodeId, type, targetNodeId, shardId );
+
+            op.writeEdge( targetCf, sourceRowKeyType, sourceEdge );
+            op.countEdge( sourceNodeId, NodeType.SOURCE, shardId, type, targetNodeType );
+        }
+
+
+        /**
+         * write edges from target<-source
+         */
+
+        final DirectedEdge targetEdge = new DirectedEdge( sourceNodeId, timestamp );
+
+
+        final ShardEntryGroup targetRowKeyShard =
+                writeEdgeShardStrategy.getWriteShards( scope, targetNodeId, NodeType.TARGET, timestamp, type );
+
+        final MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> sourceByTargetCf =
+                columnFamilies.getTargetNodeCfName();
+
+        for ( Shard shard : targetRowKeyShard.getWriteShards(time) ) {
+            final long shardId = shard.getShardIndex();
+            final RowKey targetRowKey = new RowKey( targetNodeId, type, shardId );
+
+            op.writeEdge( sourceByTargetCf, targetRowKey, targetEdge );
+            op.countEdge( targetNodeId, NodeType.TARGET, shardId, type );
+        }
+
+
+        final ShardEntryGroup targetWithTypeRowKeyShard = writeEdgeShardStrategy
+                .getWriteShards( scope, targetNodeId, NodeType.TARGET, timestamp, type, sourceNodeType );
+
+        final MultiTennantColumnFamily<ApplicationScope, RowKeyType, DirectedEdge> targetBySourceCf =
+                columnFamilies.getTargetNodeSourceTypeCfName();
+
+
+        for ( Shard shard : targetWithTypeRowKeyShard.getWriteShards(time) ) {
+
+            final long shardId = shard.getShardIndex();
+
+            final RowKeyType targetRowKeyType = new RowKeyType( targetNodeId, type, sourceNodeId, shardId );
+
+
+            op.writeEdge( targetBySourceCf, targetRowKeyType, targetEdge );
+            op.countEdge( targetNodeId, NodeType.TARGET, shardId, type, sourceNodeType );
+        }
 
         /**
          * Always a 0l shard, we're hard limiting 2b timestamps for the same edge

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9fa368e/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/ComittedGraphManagerIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/ComittedGraphManagerIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/ComittedGraphManagerIT.java
deleted file mode 100644
index 596480d..0000000
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/ComittedGraphManagerIT.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one
- *  * or more contributor license agreements.  See the NOTICE file
- *  * distributed with this work for additional information
- *  * regarding copyright ownership.  The ASF licenses this file
- *  * to you under the Apache License, Version 2.0 (the
- *  * "License"); you may not use this file except in compliance
- *  * with the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing,
- *  * software distributed under the License is distributed on an
- *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  * KIND, either express or implied.  See the License for the
- *  * specific language governing permissions and limitations
- *  * under the License.
- *
- */
-package org.apache.usergrid.persistence.graph;
-
-
-import java.util.concurrent.TimeUnit;
-
-import org.jukito.JukitoRunner;
-import org.jukito.UseModules;
-import org.junit.runner.RunWith;
-
-import org.apache.usergrid.persistence.core.cassandra.ITRunner;
-import org.apache.usergrid.persistence.graph.guice.TestGraphModule;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import rx.Observable;
-
-
-/**
- * Integration test that performs all calls immediately after writes without blocking.  Tests that our
- * view is immediately consistent to our users, even if we have yet to perform background processing
- */
-@RunWith(ITRunner.class)
-@UseModules({ TestGraphModule.class })
-public class ComittedGraphManagerIT extends GraphManagerIT {
-
-
-    @Override
-    protected GraphManager getHelper(GraphManager gm) {
-        return new ComittedGraphTestHelper( gm );
-    }
-
-
-    /**
-     * Doesn't wait for the async process to happen before returning.  Simply executes and immediately returns.
-     */
-    public static class ComittedGraphTestHelper implements GraphManager {
-
-        private final GraphManager graphManager;
-
-
-        public ComittedGraphTestHelper( final GraphManager graphManager ) {
-            this.graphManager = graphManager;
-        }
-
-
-        @Override
-        public Observable<Edge> writeEdge( final Edge edge ) {
-            return graphManager.writeEdge( edge );
-        }
-
-
-        @Override
-        public Observable<Edge> deleteEdge( final Edge edge ) {
-            return graphManager.deleteEdge( edge );
-        }
-
-
-        @Override
-        public Observable<Id> deleteNode( final Id node, final long timestamp) {
-            return graphManager.deleteNode( node, timestamp );
-        }
-
-
-        @Override
-        public Observable<Edge> loadEdgeVersions( final SearchByEdge edge ) {
-            return graphManager.loadEdgeVersions( edge );
-        }
-
-
-        @Override
-        public Observable<Edge> loadEdgesFromSource( final SearchByEdgeType search ) {
-            return graphManager.loadEdgesFromSource( search );
-        }
-
-
-        @Override
-        public Observable<Edge> loadEdgesToTarget( final SearchByEdgeType search ) {
-            return graphManager.loadEdgesToTarget( search );
-        }
-
-
-        @Override
-        public Observable<Edge> loadEdgesFromSourceByType( final SearchByIdType search ) {
-            return  graphManager.loadEdgesFromSourceByType(search);
-        }
-
-
-        @Override
-        public Observable<Edge> loadEdgesToTargetByType( final SearchByIdType search ) {
-            return graphManager.loadEdgesToTargetByType( search );
-        }
-
-
-        @Override
-        public Observable<String> getEdgeTypesFromSource( final SearchEdgeType search ) {
-            return graphManager.getEdgeTypesFromSource( search );
-        }
-
-
-        @Override
-        public Observable<String> getIdTypesFromSource( final SearchIdType search ) {
-            return graphManager.getIdTypesFromSource( search );
-        }
-
-
-        @Override
-        public Observable<String> getEdgeTypesToTarget( final SearchEdgeType search ) {
-            return graphManager.getEdgeTypesToTarget( search );
-        }
-
-
-        @Override
-        public Observable<String> getIdTypesToTarget( final SearchIdType search ) {
-            return graphManager.getIdTypesToTarget( search );
-        }
-
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9fa368e/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/CommittedGraphManagerIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/CommittedGraphManagerIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/CommittedGraphManagerIT.java
new file mode 100644
index 0000000..3c04ab4
--- /dev/null
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/CommittedGraphManagerIT.java
@@ -0,0 +1,138 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+package org.apache.usergrid.persistence.graph;
+
+
+import java.util.concurrent.TimeUnit;
+
+import org.jukito.JukitoRunner;
+import org.jukito.UseModules;
+import org.junit.runner.RunWith;
+
+import org.apache.usergrid.persistence.core.cassandra.ITRunner;
+import org.apache.usergrid.persistence.graph.guice.TestGraphModule;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import rx.Observable;
+
+
+/**
+ * Integration test that performs all calls immediately after writes without blocking.  Tests that our
+ * view is immediately consistent to our users, even if we have yet to perform background processing
+ */
+@RunWith(ITRunner.class)
+@UseModules({ TestGraphModule.class })
+public class CommittedGraphManagerIT extends GraphManagerIT {
+
+
+    @Override
+    protected GraphManager getHelper(GraphManager gm) {
+        return new ComittedGraphTestHelper( gm );
+    }
+
+
+    /**
+     * Doesn't wait for the async process to happen before returning.  Simply executes and immediately returns.
+     */
+    public static class ComittedGraphTestHelper implements GraphManager {
+
+        private final GraphManager graphManager;
+
+
+        public ComittedGraphTestHelper( final GraphManager graphManager ) {
+            this.graphManager = graphManager;
+        }
+
+
+        @Override
+        public Observable<Edge> writeEdge( final Edge edge ) {
+            return graphManager.writeEdge( edge );
+        }
+
+
+        @Override
+        public Observable<Edge> deleteEdge( final Edge edge ) {
+            return graphManager.deleteEdge( edge );
+        }
+
+
+        @Override
+        public Observable<Id> deleteNode( final Id node, final long timestamp) {
+            return graphManager.deleteNode( node, timestamp );
+        }
+
+
+        @Override
+        public Observable<Edge> loadEdgeVersions( final SearchByEdge edge ) {
+            return graphManager.loadEdgeVersions( edge );
+        }
+
+
+        @Override
+        public Observable<Edge> loadEdgesFromSource( final SearchByEdgeType search ) {
+            return graphManager.loadEdgesFromSource( search );
+        }
+
+
+        @Override
+        public Observable<Edge> loadEdgesToTarget( final SearchByEdgeType search ) {
+            return graphManager.loadEdgesToTarget( search );
+        }
+
+
+        @Override
+        public Observable<Edge> loadEdgesFromSourceByType( final SearchByIdType search ) {
+            return  graphManager.loadEdgesFromSourceByType(search);
+        }
+
+
+        @Override
+        public Observable<Edge> loadEdgesToTargetByType( final SearchByIdType search ) {
+            return graphManager.loadEdgesToTargetByType( search );
+        }
+
+
+        @Override
+        public Observable<String> getEdgeTypesFromSource( final SearchEdgeType search ) {
+            return graphManager.getEdgeTypesFromSource( search );
+        }
+
+
+        @Override
+        public Observable<String> getIdTypesFromSource( final SearchIdType search ) {
+            return graphManager.getIdTypesFromSource( search );
+        }
+
+
+        @Override
+        public Observable<String> getEdgeTypesToTarget( final SearchEdgeType search ) {
+            return graphManager.getEdgeTypesToTarget( search );
+        }
+
+
+        @Override
+        public Observable<String> getIdTypesToTarget( final SearchIdType search ) {
+            return graphManager.getIdTypesToTarget( search );
+        }
+
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9fa368e/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerializationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerializationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerializationTest.java
index 00e0164..3d98486 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerializationTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerializationTest.java
@@ -109,33 +109,13 @@ public class EdgeShardSerializationTest {
         Iterator<Shard> results =
                 edgeShardSerialization.getShardMetaData( scope, now, NodeType.SOURCE, Optional.<Shard>absent(), types );
 
-        Shard next = results.next();
-
-        assertEquals( shard3, next.getShardIndex() );
-
-        assertEquals( shard3.getCreatedTime(), next.getCreatedTime() );
-
-        assertEquals( shard3.isCompacted(), next.isCompacted() );
-
-
-        next = results.next();
-
-
-        assertEquals( shard2, next.getShardIndex() );
-
-        assertEquals( shard2.getCreatedTime(), next.getCreatedTime() );
-
-        assertEquals( shard2.isCompacted(), next.isCompacted() );
-
-
-        next = results.next();
 
+        assertEquals( shard3, results.next() );
 
-        assertEquals( shard1, next.getShardIndex() );
+        assertEquals( shard2, results.next() );
 
-        assertEquals( shard1.getCreatedTime(), next.getCreatedTime() );
+        assertEquals( shard1, results.next() );
 
-        assertEquals( shard1.isCompacted(), next.isCompacted() );
 
         assertFalse( results.hasNext() );
 
@@ -149,24 +129,10 @@ public class EdgeShardSerializationTest {
         //test paging and size
         results = edgeShardSerialization.getShardMetaData( scope, now, NodeType.SOURCE, Optional.of( shard2 ), types );
 
-        next = results.next();
-
-
-        assertEquals( shard2, next.getShardIndex() );
-
-        assertEquals( shard2.getCreatedTime(), next.getCreatedTime() );
-
-        assertEquals( shard2.isCompacted(), next.isCompacted() );
-
-
-        next = results.next();
-
-
-        assertEquals( shard1, next.getShardIndex() );
+        assertEquals( shard2, results.next() );
 
-        assertEquals( shard1.getCreatedTime(), next.getCreatedTime() );
 
-        assertEquals( shard1.isCompacted(), next.isCompacted() );
+        assertEquals( shard1, results.next() );
 
 
         assertFalse( results.hasNext() );
@@ -225,9 +191,9 @@ public class EdgeShardSerializationTest {
         results =
                 edgeShardSerialization.getShardMetaData( scope, now, NodeType.SOURCE, Optional.<Shard>absent(), types );
 
-        assertEquals( shard3, results.next().getShardIndex() );
+        assertEquals( shard3, results.next() );
 
-        assertEquals( shard2, results.next().getShardIndex() );
+        assertEquals( shard2, results.next() );
 
         assertFalse( results.hasNext() );
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9fa368e/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
index 0ac7f78..a31a8ad 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
@@ -86,556 +86,555 @@ public class NodeShardAllocationTest {
     }
 
 
-//    @Test
-//    public void minTime() {
-//        final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
-//
-//        final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
-//
-//        final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
-//
-//        final NodeShardApproximation nodeShardCounterSerialization = mock( NodeShardApproximation.class );
-//
-//
-//        final TimeService timeService = mock( TimeService.class );
-//
-//        final Keyspace keyspace = mock( Keyspace.class );
-//
-//
-//        NodeShardAllocation approximation =
-//                new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
-//                        nodeShardCounterSerialization, timeService, graphFig, keyspace );
-//
-//
-//        final long timeservicetime = System.currentTimeMillis();
-//
-//        when( timeService.getCurrentTime() ).thenReturn( timeservicetime );
-//
-//        final long expected = timeservicetime - 2 * graphFig.getShardCacheTimeout();
-//
-//        final long returned = approximation.getMinTime();
-//
-//        assertEquals( "Correct time was returned", expected, returned );
-//    }
-//
-//
-//    @Test
-//    public void noShards() {
-//        final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
-//
-//        final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
-//
-//        final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
-//
-//
-//        final NodeShardApproximation nodeShardCounterSerialization = mock( NodeShardApproximation.class );
-//
-//
-//        final TimeService timeService = mock( TimeService.class );
-//
-//        final Keyspace keyspace = mock( Keyspace.class );
-//
-//        final MutationBatch batch = mock( MutationBatch.class );
-//
-//        when( keyspace.prepareMutationBatch() ).thenReturn( batch );
-//
-//        NodeShardAllocation approximation =
-//                new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
-//                        nodeShardCounterSerialization, timeService, graphFig, keyspace );
-//
-//        final Id nodeId = createId( "test" );
-//        final String type = "type";
-//        final String subType = "subType";
-//
-//        /**
-//         * Mock up returning an empty iterator, our audit shouldn't create a new shard
-//         */
-//        when( edgeShardSerialization
-//                .getShardMetaData( same( scope ), same( nodeId ), eq( NodeType.SOURCE ), any( Optional.class ),
-//                        same( type ), same( subType ) ) ).thenReturn( Collections.<Shard>emptyList().iterator() );
-//
-//        final boolean result = approximation.auditMaxShard( scope, nodeId, NodeType.SOURCE, type, subType );
-//
-//        assertFalse( "No shard allocated", result );
-//    }
-//
-//
-//    @Test
-//    public void existingFutureShardSameTime() {
-//        final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
-//
-//        final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
-//
-//        final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
-//
-//        final NodeShardApproximation nodeShardCounterSerialization = mock( NodeShardApproximation.class );
-//
-//
-//        final TimeService timeService = mock( TimeService.class );
-//
-//        final Keyspace keyspace = mock( Keyspace.class );
-//
-//
-//        final MutationBatch batch = mock( MutationBatch.class );
-//
-//        when( keyspace.prepareMutationBatch() ).thenReturn( batch );
-//
-//
-//        NodeShardAllocation approximation =
-//                new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
-//                        nodeShardCounterSerialization, timeService, graphFig, keyspace );
-//
-//        final Id nodeId = createId( "test" );
-//        final String type = "type";
-//        final String subType = "subType";
-//
-//
-//        final long timeservicetime = System.currentTimeMillis();
-//
-//        when( timeService.getCurrentTime() ).thenReturn( timeservicetime );
-//
-//        final Shard futureShard = new Shard( 10000l, timeservicetime, compacted );
-//
-//        /**
-//         * Mock up returning a min shard, and a future shard
-//         */
-//        when( edgeShardSerialization
-//                .getShardMetaData( same( scope ), same( nodeId ), eq( NodeType.TARGET ), any( Optional.class ),
-//                        same( type ), same( subType ) ) ).thenReturn( Arrays.asList( futureShard ).iterator() );
-//
-//        final boolean result = approximation.auditMaxShard( scope, nodeId, NodeType.TARGET, type, subType );
-//
-//        assertFalse( "No shard allocated", result );
-//    }
-//
-//
-//    @Test
-//    public void lowCountFutureShard() {
-//        final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
-//
-//        final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
-//
-//        final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
-//
-//        final NodeShardApproximation nodeShardApproximation = mock( NodeShardApproximation.class );
-//
-//
-//        final TimeService timeService = mock( TimeService.class );
-//
-//        final Keyspace keyspace = mock( Keyspace.class );
-//
-//        final MutationBatch batch = mock( MutationBatch.class );
-//
-//        when( keyspace.prepareMutationBatch() ).thenReturn( batch );
-//
-//
-//        NodeShardAllocation approximation =
-//                new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
-//                        nodeShardApproximation, timeService, graphFig, keyspace );
-//
-//        final Id nodeId = createId( "test" );
-//        final String type = "type";
-//        final String subType = "subType";
-//
-//
-//        final long timeservicetime = System.currentTimeMillis();
-//
-//        when( timeService.getCurrentTime() ).thenReturn( timeservicetime );
-//
-//
-//        /**
-//         * Mock up returning a min shard, and a future shard
-//         */
-//        when( edgeShardSerialization
-//                .getShardMetaData( same( scope ), same( nodeId ), eq( NodeType.TARGET ), any( Optional.class ),
-//                        same( type ), same( subType ) ) ).thenReturn( Arrays.asList( new Shard( 0l, 0l, compacted ) ).iterator() );
-//
-//
-//        //return a shard size < our max by 1
-//
-//        final long count = graphFig.getShardSize() - 1;
-//
-//        when( nodeShardApproximation.getCount( scope, nodeId, NodeType.TARGET, 0l, type, subType ) )
-//                .thenReturn( count );
-//
-//        final boolean result = approximation.auditMaxShard( scope, nodeId, NodeType.TARGET, type, subType );
-//
-//        assertFalse( "Shard allocated", result );
-//    }
-//
-//
-//    @Test
-//    public void equalCountFutureShard() {
-//        final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
-//
-//        final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
-//
-//        final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
-//
-//        final NodeShardApproximation nodeShardApproximation = mock( NodeShardApproximation.class );
-//
-//
-//        final TimeService timeService = mock( TimeService.class );
-//
-//        final Keyspace keyspace = mock( Keyspace.class );
-//
-//        final MutationBatch batch = mock( MutationBatch.class );
-//
-//        when( keyspace.prepareMutationBatch() ).thenReturn( batch );
-//
-//
-//        NodeShardAllocation approximation =
-//                new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
-//                        nodeShardApproximation, timeService, graphFig, keyspace );
-//
-//        final Id nodeId = createId( "test" );
-//        final String type = "type";
-//        final String subType = "subType";
-//
-//
-//        final long timeservicetime = System.currentTimeMillis();
-//
-//        when( timeService.getCurrentTime() ).thenReturn( timeservicetime );
-//
-//
-//        /**
-//         * Mock up returning a min shard
-//         */
-//        when( edgeShardSerialization
-//                .getShardMetaData( same( scope ), same( nodeId ), eq( NodeType.SOURCE ), any( Optional.class ),
-//                        same( type ), same( subType ) ) ).thenReturn( Arrays.asList( new Shard( 0l, 0l, compacted ) ).iterator() );
-//
-//
-//        final long shardCount = graphFig.getShardSize();
-//
-//        //return a shard size equal to our max
-//        when( nodeShardApproximation.getCount( scope, nodeId, NodeType.SOURCE, 0l, type, subType ) )
-//                .thenReturn( shardCount );
-//
-//        ArgumentCaptor<Long> shardValue = ArgumentCaptor.forClass( Long.class );
-//        ArgumentCaptor<Long> timestampValue = ArgumentCaptor.forClass( Long.class );
-//
-//
-//        //mock up our mutation
-//        when( edgeShardSerialization
-//                .writeShardMeta( same( scope ), same( nodeId ), eq( NodeType.SOURCE ), shardValue.capture(),
-//                        timestampValue.capture(), same( type ), same( subType ) ) )
-//                .thenReturn( mock( MutationBatch.class ) );
-//
-//
-//        final SimpleMarkedEdge returnedEdge =
-//                new SimpleMarkedEdge( nodeId, type, createId( "subType" ), 10005l, false );
-//        final Iterator<MarkedEdge> edgeIterator = Collections.singleton( ( MarkedEdge ) returnedEdge ).iterator();
-//
-//        //mock up returning the value
-//        when( shardedEdgeSerialization
-//                .getEdgesFromSourceByTargetType( same( edgeColumnFamilies ), same( scope ), any( SearchByIdType.class ),
-//                        any( Iterator.class ) ) ).thenReturn( edgeIterator );
-//
-//
-//        final boolean result = approximation.auditMaxShard( scope, nodeId, NodeType.SOURCE, type, subType );
-//
-//        assertTrue( "Shard allocated", result );
-//
-//        //check our new allocated UUID
-//
-//
-//        final long savedTimestamp = timestampValue.getValue();
-//
-//
-//        assertEquals( "Expected time service time", timeservicetime, savedTimestamp );
-//
-//
-//        //now check our max value was set
-//
-//        final long savedShardPivot = shardValue.getValue();
-//
-//        assertEquals( "Expected max value to be the same", returnedEdge.getTimestamp(), savedShardPivot );
-//    }
-//
-//
-//    @Test
-//    public void futureCountShardCleanup() {
-//        final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
-//
-//        final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
-//
-//        final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
-//
-//        final NodeShardApproximation nodeShardApproximation = mock( NodeShardApproximation.class );
-//
-//
-//        final TimeService timeService = mock( TimeService.class );
-//
-//        final Keyspace keyspace = mock( Keyspace.class );
-//
-//        final MutationBatch batch = mock( MutationBatch.class );
-//
-//        when( keyspace.prepareMutationBatch() ).thenReturn( batch );
-//
-//
-//        NodeShardAllocation approximation =
-//                new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
-//                        nodeShardApproximation, timeService, graphFig, keyspace );
-//
-//        final Id nodeId = createId( "test" );
-//        final String type = "type";
-//        final String subType = "subType";
-//
-//
-//        /**
-//         * Use the time service to generate timestamps
-//         */
-//        final long timeservicetime = 10000;
-//
-//
-//        when( timeService.getCurrentTime() ).thenReturn( timeservicetime );
-//
-//        assertTrue( "Shard cache mocked", graphFig.getShardCacheTimeout() > 0 );
-//
-//
-//        /**
-//         * Simulates clock drift when 2 nodes create future shards near one another
-//         */
-//        final long minDelta = graphFig.getShardMinDelta();
-//
-//
-//        final Shard minShard = new Shard( 0l, 0l, compacted );
-//
-//        //a shard that isn't our minimum, but exists after compaction
-//        final Shard compactedShard = new Shard( 5000, 1000, compacted );
-//
-//        /**
-//         * Simulate different node time allocation
-//         */
-//
-//        final long minTime = 10000;
-//        //our second shard is the "oldest", and hence should be returned in the iterator.  Future shard 1 and 3
-//        // should be removed
-//
-//        //this should get dropped, It's allocated after future shard2 even though the time is less
-//        final Shard futureShard1 = new Shard( 10000, minTime + minDelta, compacted );
-//
-//        //should get kept.
-//        final Shard futureShard2 = new Shard( 10005, minTime, compacted );
-//
-//        //should be removed
-//        final Shard futureShard3 = new Shard( 10010, minTime + minDelta / 2, compacted );
-//
-//        /**
-//         * Mock up returning a min shard
-//         */
-//        when( edgeShardSerialization
-//                .getShardMetaData( same( scope ), same( nodeId ), eq( NodeType.TARGET ), any( Optional.class ),
-//                        same( type ), same( subType ) ) ).thenReturn(
-//                Arrays.asList( futureShard3, futureShard2, futureShard1, compactedShard, minShard ).iterator() );
-//
-//
-//        ArgumentCaptor<Long> newLongValue = ArgumentCaptor.forClass( Long.class );
-//
-//
-//        //mock up our mutation
-//        when( edgeShardSerialization
-//                .removeShardMeta( same( scope ), same( nodeId ), eq( NodeType.TARGET ), newLongValue.capture(),
-//                        same( type ), same( subType ) ) ).thenReturn( mock( MutationBatch.class ) );
-//
-//
-//        final Iterator<ShardEntryGroup> result =
-//                approximation.getShards( scope, nodeId, NodeType.TARGET, Optional.<Shard>absent(), type, subType );
-//
-//
-//        assertTrue( "Shards present", result.hasNext() );
-//
-//
-//        ShardEntryGroup shardEntryGroup = result.next();
-//
-//        assertEquals( "Future shard returned", futureShard1, shardEntryGroup.getCompactionTarget() );
-//
-//
-//        //now verify all 4 are in this group.  This is because the first shard (0,0) (n-1_ may be the only shard other
-//        //nodes see while we're rolling our state.  This means it should be read and merged from as well
-//
-//        Collection<Shard> writeShards = shardEntryGroup.getWriteShards();
-//
-//        assertEquals( "Shard size as expected", 4, writeShards.size() );
-//
-//        assertTrue( writeShards.contains( futureShard1 ) );
-//        assertTrue( writeShards.contains( futureShard2 ) );
-//        assertTrue( writeShards.contains( futureShard3 ) );
-//        assertTrue( writeShards.contains( compactedShard ) );
-//
-//
-//        Collection<Shard> readShards = shardEntryGroup.getReadShards( minTime + minDelta );
-//
-//        assertEquals( "Shard size as expected", 4, readShards.size() );
-//
-//        assertTrue( readShards.contains( futureShard1 ) );
-//        assertTrue( readShards.contains( futureShard2 ) );
-//        assertTrue( readShards.contains( futureShard3 ) );
-//        assertTrue( readShards.contains( compactedShard ) );
-//
-//
-//        assertTrue( "Shards present", result.hasNext() );
-//
-//        shardEntryGroup = result.next();
-//
-//        writeShards = shardEntryGroup.getWriteShards();
-//
-//
-//        assertTrue( "Previous shard present", writeShards.contains( minShard ) );
-//
-//
-//        writeShards = shardEntryGroup.getReadShards( minTime + minDelta );
-//
-//
-//        assertTrue( "Previous shard present", writeShards.contains( minShard ) );
-//
-//
-//        assertFalse( "No shards left", result.hasNext() );
-//    }
-//
-//
-//    @Test
-//    public void noShardsReturns() {
-//        final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
-//
-//        final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
-//
-//        final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
-//
-//        final NodeShardApproximation nodeShardApproximation = mock( NodeShardApproximation.class );
-//
-//
-//        final TimeService timeService = mock( TimeService.class );
-//
-//        when( timeService.getCurrentTime() ).thenReturn( 10000l );
-//
-//        final Keyspace keyspace = mock( Keyspace.class );
-//
-//        final MutationBatch batch = mock( MutationBatch.class );
-//
-//        when( keyspace.prepareMutationBatch() ).thenReturn( batch );
-//
-//        NodeShardAllocation approximation =
-//                new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
-//                        nodeShardApproximation, timeService, graphFig, keyspace );
-//
-//        final Id nodeId = createId( "test" );
-//        final String type = "type";
-//        final String subType = "subType";
-//
-//        /**
-//         * Mock up returning an empty iterator, our audit shouldn't create a new shard
-//         */
-//        when( edgeShardSerialization
-//                .getShardMetaData( same( scope ), same( nodeId ), eq( NodeType.TARGET ), any( Optional.class ),
-//                        same( type ), same( subType ) ) ).thenReturn( Collections.<Shard>emptyList().iterator() );
-//
-//        final Iterator<ShardEntryGroup> result =
-//                approximation.getShards( scope, nodeId, NodeType.TARGET, Optional.<Shard>absent(), type, subType );
-//
-//
-//        ShardEntryGroup shardEntryGroup = result.next();
-//
-//        final Shard expected = new Shard( 0, 0, compacted );
-//
-//        assertEquals( "Future shard returned", expected, shardEntryGroup.getCompactionTarget() );
-//
-//
-//        //now verify all 4 are in this group.  This is because the first shard (0,0) (n-1_ may be the only shard other
-//        //nodes see while we're rolling our state.  This means it should be read and merged from as well
-//
-//        Collection<Shard> writeShards = shardEntryGroup.getWriteShards();
-//
-//        Collection<Shard> readShards = shardEntryGroup.getReadShards( 10000l );
-//
-//
-//        assertTrue( "0 shard allocated", writeShards.contains( expected ) );
-//
-//        assertTrue( "0 shard allocated", readShards.contains( expected ) );
-//
-//
-//        assertFalse( "No shard allocated", result.hasNext() );
-//    }
-//
-//
-//    @Test
-//    public void invalidConfiguration() {
-//
-//        final GraphFig graphFig = mock( GraphFig.class );
-//
-//        final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
-//
-//        final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
-//
-//        final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
-//
-//        final NodeShardApproximation nodeShardApproximation = mock( NodeShardApproximation.class );
-//
-//
-//        /**
-//         * Return 100000 milliseconds
-//         */
-//        final TimeService timeService = mock( TimeService.class );
-//
-//        final long time = 100000l;
-//
-//        when( timeService.getCurrentTime() ).thenReturn( time );
-//
-//
-//        final long cacheTimeout = 30000l;
-//
-//        when( graphFig.getShardCacheTimeout() ).thenReturn( 30000l );
-//
-//
-//        final long tooSmallDelta = ( long ) ( ( cacheTimeout * 2 ) * .99 );
-//
-//        when( graphFig.getShardMinDelta() ).thenReturn( tooSmallDelta );
-//
-//
-//        final Keyspace keyspace = mock( Keyspace.class );
-//
-//        final MutationBatch batch = mock( MutationBatch.class );
-//
-//        when( keyspace.prepareMutationBatch() ).thenReturn( batch );
-//
-//
-//        NodeShardAllocation approximation =
-//                new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
-//                        nodeShardApproximation, timeService, graphFig, keyspace );
-//
-//
-//        /**
-//         * Should throw an exception
-//         */
-//        try {
-//            approximation.getMinTime();
-//            fail( "Should have thrown a GraphRuntimeException" );
-//        }
-//        catch ( GraphRuntimeException gre ) {
-//            //swallow
-//        }
-//
-//        //now test something that passes.
-//
-//        final long minDelta = cacheTimeout * 2;
-//
-//        when( graphFig.getShardMinDelta() ).thenReturn( minDelta );
-//
-//        long returned = approximation.getMinTime();
-//
-//        long expectedReturned = time - minDelta;
-//
-//        assertEquals( expectedReturned, returned );
-//
-//        final long delta = cacheTimeout * 4;
-//
-//        when( graphFig.getShardMinDelta() ).thenReturn( delta );
-//
-//        returned = approximation.getMinTime();
-//
-//        expectedReturned = time - delta;
-//
-//        assertEquals( expectedReturned, returned );
-//    }
+    @Test
+    public void minTime() {
+        final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
+
+        final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
+
+        final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
+
+        final NodeShardApproximation nodeShardCounterSerialization = mock( NodeShardApproximation.class );
+
+
+        final TimeService timeService = mock( TimeService.class );
+
+        final Keyspace keyspace = mock( Keyspace.class );
+
+
+        NodeShardAllocation approximation =
+                new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
+                        nodeShardCounterSerialization, timeService, graphFig, keyspace );
+
+
+        final long timeservicetime = System.currentTimeMillis();
+
+        when( timeService.getCurrentTime() ).thenReturn( timeservicetime );
+
+        final long expected = timeservicetime - 2 * graphFig.getShardCacheTimeout();
+
+        final long returned = approximation.getMinTime();
+
+        assertEquals( "Correct time was returned", expected, returned );
+    }
+
+
+    @Test
+    public void noShards() {
+        final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
+
+        final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
+
+        final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
+
+
+        final NodeShardApproximation nodeShardCounterSerialization = mock( NodeShardApproximation.class );
+
+
+        final TimeService timeService = mock( TimeService.class );
+
+        final Keyspace keyspace = mock( Keyspace.class );
+
+        final MutationBatch batch = mock( MutationBatch.class );
+
+        when( keyspace.prepareMutationBatch() ).thenReturn( batch );
+
+        NodeShardAllocation approximation =
+                new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
+                        nodeShardCounterSerialization, timeService, graphFig, keyspace );
+
+        final Id nodeId = createId( "test" );
+        final String type = "type";
+        final String subType = "subType";
+
+        /**
+         * Mock up returning an empty iterator, our audit shouldn't create a new shard
+         */
+        when( edgeShardSerialization
+                .getShardMetaData( same( scope ), same( nodeId ), eq( NodeType.SOURCE ), any( Optional.class ),
+                        same( type ), same( subType ) ) ).thenReturn( Collections.<Shard>emptyList().iterator() );
+
+        final boolean result = approximation.auditMaxShard( scope, nodeId, NodeType.SOURCE, type, subType );
+
+        assertFalse( "No shard allocated", result );
+    }
+
+
+    @Test
+    public void existingFutureShardSameTime() {
+        final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
+
+        final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
+
+        final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
+
+        final NodeShardApproximation nodeShardCounterSerialization = mock( NodeShardApproximation.class );
+
+
+        final TimeService timeService = mock( TimeService.class );
+
+        final Keyspace keyspace = mock( Keyspace.class );
+
+
+        final MutationBatch batch = mock( MutationBatch.class );
+
+        when( keyspace.prepareMutationBatch() ).thenReturn( batch );
+
+
+        NodeShardAllocation approximation =
+                new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
+                        nodeShardCounterSerialization, timeService, graphFig, keyspace );
+
+        final Id nodeId = createId( "test" );
+        final String type = "type";
+        final String subType = "subType";
+
+
+        final long timeservicetime = System.currentTimeMillis();
+
+        when( timeService.getCurrentTime() ).thenReturn( timeservicetime );
+
+        final Shard futureShard = new Shard( 10000l, timeservicetime, true );
+
+        /**
+         * Mock up returning a min shard, and a future shard
+         */
+        when( edgeShardSerialization
+                .getShardMetaData( same( scope ), same( nodeId ), eq( NodeType.TARGET ), any( Optional.class ),
+                        same( type ), same( subType ) ) ).thenReturn( Arrays.asList( futureShard ).iterator() );
+
+        final boolean result = approximation.auditMaxShard( scope, nodeId, NodeType.TARGET, type, subType );
+
+        assertFalse( "No shard allocated", result );
+    }
+
+
+    @Test
+    public void lowCountFutureShard() {
+        final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
+
+        final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
+
+        final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
+
+        final NodeShardApproximation nodeShardApproximation = mock( NodeShardApproximation.class );
+
+
+        final TimeService timeService = mock( TimeService.class );
+
+        final Keyspace keyspace = mock( Keyspace.class );
+
+        final MutationBatch batch = mock( MutationBatch.class );
+
+        when( keyspace.prepareMutationBatch() ).thenReturn( batch );
+
+
+        NodeShardAllocation approximation =
+                new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
+                        nodeShardApproximation, timeService, graphFig, keyspace );
+
+        final Id nodeId = createId( "test" );
+        final String type = "type";
+        final String subType = "subType";
+
+
+        final long timeservicetime = System.currentTimeMillis();
+
+        when( timeService.getCurrentTime() ).thenReturn( timeservicetime );
+
+
+        /**
+         * Mock up returning a min shard, and a future shard
+         */
+        when( edgeShardSerialization
+                .getShardMetaData( same( scope ), same( nodeId ), eq( NodeType.TARGET ), any( Optional.class ),
+                        same( type ), same( subType ) ) ).thenReturn( Arrays.asList( new Shard( 0l, 0l, true ) ).iterator() );
+
+
+        //return a shard size < our max by 1
+
+        final long count = graphFig.getShardSize() - 1;
+
+        when( nodeShardApproximation.getCount( scope, nodeId, NodeType.TARGET, 0l, type, subType ) )
+                .thenReturn( count );
+
+        final boolean result = approximation.auditMaxShard( scope, nodeId, NodeType.TARGET, type, subType );
+
+        assertFalse( "Shard allocated", result );
+    }
+
+
+    @Test
+    public void equalCountFutureShard() {
+        final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
+
+        final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
+
+        final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
+
+        final NodeShardApproximation nodeShardApproximation = mock( NodeShardApproximation.class );
+
+
+        final TimeService timeService = mock( TimeService.class );
+
+        final Keyspace keyspace = mock( Keyspace.class );
+
+        final MutationBatch batch = mock( MutationBatch.class );
+
+        when( keyspace.prepareMutationBatch() ).thenReturn( batch );
+
+
+        NodeShardAllocation approximation =
+                new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
+                        nodeShardApproximation, timeService, graphFig, keyspace );
+
+        final Id nodeId = createId( "test" );
+        final String type = "type";
+        final String subType = "subType";
+
+
+        final long timeservicetime = System.currentTimeMillis();
+
+        when( timeService.getCurrentTime() ).thenReturn( timeservicetime );
+
+
+        /**
+         * Mock up returning a min shard
+         */
+        when( edgeShardSerialization
+                .getShardMetaData( same( scope ), same( nodeId ), eq( NodeType.SOURCE ), any( Optional.class ),
+                        same( type ), same( subType ) ) ).thenReturn( Arrays.asList( new Shard( 0l, 0l, true ) ).iterator() );
+
+
+        final long shardCount = graphFig.getShardSize();
+
+        //return a shard size equal to our max
+        when( nodeShardApproximation.getCount( scope, nodeId, NodeType.SOURCE, 0l, type, subType ) )
+                .thenReturn( shardCount );
+
+        ArgumentCaptor<Shard> shardValue = ArgumentCaptor.forClass( Shard.class );
+
+
+        //mock up our mutation
+        when( edgeShardSerialization
+                .writeShardMeta( same( scope ), same( nodeId ), eq( NodeType.SOURCE ), shardValue.capture(), same( type ), same( subType ) ) )
+                .thenReturn( mock( MutationBatch.class ) );
+
+
+        final SimpleMarkedEdge returnedEdge =
+                new SimpleMarkedEdge( nodeId, type, createId( "subType" ), 10005l, false );
+        final Iterator<MarkedEdge> edgeIterator = Collections.singleton( ( MarkedEdge ) returnedEdge ).iterator();
+
+        //mock up returning the value
+        when( shardedEdgeSerialization
+                .getEdgesFromSourceByTargetType( same( edgeColumnFamilies ), same( scope ), any( SearchByIdType.class ),
+                        any( Iterator.class ) ) ).thenReturn( edgeIterator );
+
+
+        final boolean result = approximation.auditMaxShard( scope, nodeId, NodeType.SOURCE, type, subType );
+
+        assertTrue( "Shard allocated", result );
+
+        //check our new allocated UUID
+
+
+        final long savedTimestamp = shardValue.getValue().getCreatedTime();
+
+
+        assertEquals( "Expected time service time", timeservicetime, savedTimestamp );
+
+
+        //now check our max value was set
+
+        final long savedShardPivot = shardValue.getValue().getShardIndex();
+
+        assertEquals( "Expected max value to be the same", returnedEdge.getTimestamp(), savedShardPivot );
+    }
+
+
+    @Test
+    public void futureCountShardCleanup() {
+               final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
+
+        final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
+
+        final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
+
+        final NodeShardApproximation nodeShardApproximation = mock( NodeShardApproximation.class );
+
+
+        final TimeService timeService = mock( TimeService.class );
+
+        final Keyspace keyspace = mock( Keyspace.class );
+
+        final MutationBatch batch = mock( MutationBatch.class );
+
+        when( keyspace.prepareMutationBatch() ).thenReturn( batch );
+
+
+        NodeShardAllocation approximation =
+                new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
+                        nodeShardApproximation, timeService, graphFig, keyspace );
+
+        final Id nodeId = createId( "test" );
+        final String type = "type";
+        final String subType = "subType";
+
+
+        /**
+         * Use the time service to generate timestamps
+         */
+        final long timeservicetime = 10000;
+
+
+        when( timeService.getCurrentTime() ).thenReturn( timeservicetime );
+
+        assertTrue( "Shard cache mocked", graphFig.getShardCacheTimeout() > 0 );
+
+
+        /**
+         * Simulates clock drift when 2 nodes create future shards near one another
+         */
+        final long minDelta = graphFig.getShardMinDelta();
+
+
+        final Shard minShard = new Shard( 0l, 0l, true );
+
+        //a shard that isn't our minimum, but exists after compaction
+        final Shard compactedShard = new Shard( 5000, 1000, true );
+
+        /**
+         * Simulate different node time allocation
+         */
+
+        final long minTime = 10000;
+        //our second shard is the "oldest", and hence should be returned in the iterator.  Future shard 1 and 3
+        // should be removed
+
+        //this should get dropped, It's allocated after future shard2 even though the time is less
+        final Shard futureShard1 = new Shard( 10000, minTime + minDelta, false );
+
+        //should get kept.
+        final Shard futureShard2 = new Shard( 10005, minTime, false );
+
+        //should be removed
+        final Shard futureShard3 = new Shard( 10010, minTime + minDelta / 2, false );
+
+        /**
+         * Mock up returning a min shard
+         */
+        when( edgeShardSerialization
+                .getShardMetaData( same( scope ), same( nodeId ), eq( NodeType.TARGET ), any( Optional.class ),
+                        same( type ), same( subType ) ) ).thenReturn(
+                Arrays.asList( futureShard3, futureShard2, futureShard1, compactedShard, minShard ).iterator() );
+
+
+        ArgumentCaptor<Shard> newLongValue = ArgumentCaptor.forClass( Shard.class );
+
+
+        //mock up our mutation
+        when( edgeShardSerialization
+                .removeShardMeta( same( scope ), same( nodeId ), eq( NodeType.TARGET ), newLongValue.capture(),
+                        same( type ), same( subType ) ) ).thenReturn( mock( MutationBatch.class ) );
+
+
+        final Iterator<ShardEntryGroup> result =
+                approximation.getShards( scope, nodeId, NodeType.TARGET, Optional.<Shard>absent(), type, subType );
+
+
+        assertTrue( "Shards present", result.hasNext() );
+
+
+        ShardEntryGroup shardEntryGroup = result.next();
+
+        assertEquals( "Future shard returned", futureShard1, shardEntryGroup.getCompactionTarget() );
+
+
+        //now verify all 4 are in this group.  This is because the first shard (0,0) (n-1_ may be the only shard other
+        //nodes see while we're rolling our state.  This means it should be read and merged from as well
+
+        Collection<Shard> writeShards = shardEntryGroup.getWriteShards(minTime + minDelta);
+
+        assertEquals( "Shard size as expected", 4, writeShards.size() );
+
+        assertTrue( writeShards.contains( futureShard1 ) );
+        assertTrue( writeShards.contains( futureShard2 ) );
+        assertTrue( writeShards.contains( futureShard3 ) );
+        assertTrue( writeShards.contains( compactedShard ) );
+
+
+        Collection<Shard> readShards = shardEntryGroup.getReadShards( );
+
+        assertEquals( "Shard size as expected", 4, readShards.size() );
+
+        assertTrue( readShards.contains( futureShard1 ) );
+        assertTrue( readShards.contains( futureShard2 ) );
+        assertTrue( readShards.contains( futureShard3 ) );
+        assertTrue( readShards.contains( compactedShard ) );
+
+
+        assertTrue( "Shards present", result.hasNext() );
+
+        shardEntryGroup = result.next();
+
+
+        writeShards = shardEntryGroup.getWriteShards(minTime + minDelta);
+
+
+        assertTrue( "Previous shard present", writeShards.contains( minShard ) );
+
+
+        writeShards = shardEntryGroup.getReadShards();
+
+
+        assertTrue( "Previous shard present", writeShards.contains( minShard ) );
+
+
+        assertFalse( "No shards left", result.hasNext() );
+    }
+
+
+    @Test
+    public void noShardsReturns() {
+        final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
+
+        final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
+
+        final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
+
+        final NodeShardApproximation nodeShardApproximation = mock( NodeShardApproximation.class );
+
+
+        final TimeService timeService = mock( TimeService.class );
+
+        when( timeService.getCurrentTime() ).thenReturn( 10000l );
+
+        final Keyspace keyspace = mock( Keyspace.class );
+
+        final MutationBatch batch = mock( MutationBatch.class );
+
+        when( keyspace.prepareMutationBatch() ).thenReturn( batch );
+
+        NodeShardAllocation approximation =
+                new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
+                        nodeShardApproximation, timeService, graphFig, keyspace );
+
+        final Id nodeId = createId( "test" );
+        final String type = "type";
+        final String subType = "subType";
+
+        /**
+         * Mock up returning an empty iterator, our audit shouldn't create a new shard
+         */
+        when( edgeShardSerialization
+                .getShardMetaData( same( scope ), same( nodeId ), eq( NodeType.TARGET ), any( Optional.class ),
+                        same( type ), same( subType ) ) ).thenReturn( Collections.<Shard>emptyList().iterator() );
+
+        final Iterator<ShardEntryGroup> result =
+                approximation.getShards( scope, nodeId, NodeType.TARGET, Optional.<Shard>absent(), type, subType );
+
+
+        ShardEntryGroup shardEntryGroup = result.next();
+
+        final Shard expected = new Shard( 0, 0, false );
+
+        assertEquals( "Future shard returned", expected, shardEntryGroup.getCompactionTarget() );
+
+
+        //now verify all 4 are in this group.  This is because the first shard (0,0) (n-1_ may be the only shard other
+        //nodes see while we're rolling our state.  This means it should be read and merged from as well
+
+        Collection<Shard> writeShards = shardEntryGroup.getWriteShards(timeService.getCurrentTime());
+
+        Collection<Shard> readShards = shardEntryGroup.getReadShards( );
+
+
+        assertTrue( "0 shard allocated", writeShards.contains( expected ) );
+
+        assertTrue( "0 shard allocated", readShards.contains( expected ) );
+
+
+        assertFalse( "No shard allocated", result.hasNext() );
+    }
+
+
+    @Test
+    public void invalidConfiguration() {
+
+        final GraphFig graphFig = mock( GraphFig.class );
+
+        final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
+
+        final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
+
+        final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
+
+        final NodeShardApproximation nodeShardApproximation = mock( NodeShardApproximation.class );
+
+
+        /**
+         * Return 100000 milliseconds
+         */
+        final TimeService timeService = mock( TimeService.class );
+
+        final long time = 100000l;
+
+        when( timeService.getCurrentTime() ).thenReturn( time );
+
+
+        final long cacheTimeout = 30000l;
+
+        when( graphFig.getShardCacheTimeout() ).thenReturn( 30000l );
+
+
+        final long tooSmallDelta = ( long ) ( ( cacheTimeout * 2 ) * .99 );
+
+        when( graphFig.getShardMinDelta() ).thenReturn( tooSmallDelta );
+
+
+        final Keyspace keyspace = mock( Keyspace.class );
+
+        final MutationBatch batch = mock( MutationBatch.class );
+
+        when( keyspace.prepareMutationBatch() ).thenReturn( batch );
+
+
+        NodeShardAllocation approximation =
+                new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
+                        nodeShardApproximation, timeService, graphFig, keyspace );
+
+
+        /**
+         * Should throw an exception
+         */
+        try {
+            approximation.getMinTime();
+            fail( "Should have thrown a GraphRuntimeException" );
+        }
+        catch ( GraphRuntimeException gre ) {
+            //swallow
+        }
+
+        //now test something that passes.
+
+        final long minDelta = cacheTimeout * 2;
+
+        when( graphFig.getShardMinDelta() ).thenReturn( minDelta );
+
+        long returned = approximation.getMinTime();
+
+        long expectedReturned = time - minDelta;
+
+        assertEquals( expectedReturned, returned );
+
+        final long delta = cacheTimeout * 4;
+
+        when( graphFig.getShardMinDelta() ).thenReturn( delta );
+
+        returned = approximation.getMinTime();
+
+        expectedReturned = time - delta;
+
+        assertEquals( expectedReturned, returned );
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9fa368e/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroupTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroupTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroupTest.java
index a898e5c..b08da9a 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroupTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroupTest.java
@@ -19,6 +19,9 @@
 package org.apache.usergrid.persistence.graph.serialization.impl.shard;
 
 
+import java.util.Collection;
+import java.util.Set;
+
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -65,11 +68,11 @@ public class ShardEntryGroupTest {
 
         ShardEntryGroup shardEntryGroup = new ShardEntryGroup( delta );
 
-        boolean result = shardEntryGroup.addShard( firstShard );
+        boolean result = shardEntryGroup.addShard( secondShard );
 
         assertTrue( "Shard added", result );
 
-        result = shardEntryGroup.addShard( secondShard );
+        result = shardEntryGroup.addShard( firstShard );
 
         assertTrue( " Shard added", result );
 
@@ -102,7 +105,7 @@ public class ShardEntryGroupTest {
 
         ShardEntryGroup shardEntryGroup = new ShardEntryGroup( delta );
 
-        boolean result = shardEntryGroup.addShard( compactedShard );
+        boolean result = shardEntryGroup.addShard( secondShard );
 
 
         assertTrue( "Shard added", result );
@@ -111,7 +114,7 @@ public class ShardEntryGroupTest {
 
         assertTrue( "Shard added", result );
 
-        result = shardEntryGroup.addShard( secondShard );
+        result = shardEntryGroup.addShard( compactedShard );
 
         assertTrue( " Shard added", result );
 
@@ -224,10 +227,195 @@ public class ShardEntryGroupTest {
                 shardEntryGroup.shouldCompact( secondShard.getCreatedTime() + delta ) );
 
 
-        assertFalse( "Merge cannot be run within min time", shardEntryGroup.shouldCompact( secondShard.getCreatedTime() + delta + 1 ) );
+        assertFalse( "Merge cannot be run within min time",
+                shardEntryGroup.shouldCompact( secondShard.getCreatedTime() + delta + 1 ) );
 
         assertTrue( "Merge should be run with after min time", shardEntryGroup.shouldCompact( firstShard.getCreatedTime() + delta + 1 ) );
     }
+
+
+    /**
+     * Ensures that we read from all shards (even the compacted one)
+     */
+    @Test
+    public void getAllReadShards() {
+
+        final long delta = 10000;
+
+        Shard firstShard = new Shard( 1000, 10000, false );
+
+        Shard secondShard = new Shard( 999, 9000, false );
+
+        Shard compactedShard1 = new Shard( 900, 8000, true );
+
+
+        ShardEntryGroup shardEntryGroup = new ShardEntryGroup( delta );
+
+        boolean result = shardEntryGroup.addShard( firstShard );
+
+        assertTrue( "Shard added", result );
+
+        result = shardEntryGroup.addShard( secondShard );
+
+        assertTrue( " Shard added", result );
+
+        result = shardEntryGroup.addShard( compactedShard1 );
+
+        assertTrue( "Shard added", result );
+
+        Collection<Shard> readShards = shardEntryGroup.getReadShards();
+
+        assertEquals("Shard size correct", 3, readShards.size());
+
+        assertTrue("First shard present",  readShards.contains( firstShard ) );
+
+        assertTrue("Second shard present",  readShards.contains( firstShard ) );
+
+        assertTrue("Third shard present",  readShards.contains( firstShard ) );
+
+    }
+
+
+    /**
+     * Ensures that we read from all shards (even the compacted one)
+     */
+    @Test
+    public void getAllWriteShardsNotPastCompaction() {
+
+        final long delta = 10000;
+
+        Shard firstShard = new Shard( 1000, 10000, false );
+
+        Shard secondShard = new Shard( 999, 9000, false );
+
+        Shard compactedShard = new Shard( 900, 8000, true );
+
+
+        ShardEntryGroup shardEntryGroup = new ShardEntryGroup( delta );
+
+        boolean result = shardEntryGroup.addShard( firstShard );
+
+        assertTrue( "Shard added", result );
+
+        result = shardEntryGroup.addShard( secondShard );
+
+        assertTrue( " Shard added", result );
+
+        result = shardEntryGroup.addShard( compactedShard );
+
+        assertTrue( "Shard added", result );
+
+
+
+        Collection<Shard> writeShards = shardEntryGroup.getWriteShards( firstShard.getCreatedTime() + delta );
+
+        assertEquals("Shard size correct", 3, writeShards.size());
+
+        assertTrue("First shard present",  writeShards.contains( firstShard ) );
+
+        assertTrue("Second shard present",  writeShards.contains( secondShard ) );
+
+        assertTrue("Third shard present",  writeShards.contains( compactedShard ) );
+
+
+
+        writeShards = shardEntryGroup.getWriteShards(secondShard.getCreatedTime()+delta);
+
+        assertEquals("Shard size correct", 3, writeShards.size());
+
+        assertTrue("First shard present",  writeShards.contains( firstShard ) );
+
+        assertTrue("Second shard present",  writeShards.contains( secondShard ) );
+
+        assertTrue("Third shard present",  writeShards.contains( compactedShard ) );
+
+
+        /**
+         * Not the max created timestamp, shouldn't return less than all shards
+         */
+        writeShards = shardEntryGroup.getWriteShards(secondShard.getCreatedTime() +1 + delta);
+
+        assertEquals("Shard size correct", 3, writeShards.size());
+
+        assertTrue("First shard present",  writeShards.contains( firstShard ) );
+
+        assertTrue("Second shard present",  writeShards.contains( secondShard ) );
+
+        assertTrue("Third shard present",  writeShards.contains( compactedShard ) );
+
+
+
+        assertEquals("Compaction target correct", secondShard, shardEntryGroup.getCompactionTarget());
+
+        writeShards = shardEntryGroup.getWriteShards(firstShard.getCreatedTime() +1 + delta);
+
+        assertEquals("Shard size correct", 1, writeShards.size());
+
+
+        assertTrue("Second shard present",  writeShards.contains( secondShard ) );
+
+    }
+
+
+    @Test(expected=IllegalArgumentException.class)
+    public void failsInsertionOrder() {
+
+        final long delta = 10000;
+
+        Shard secondShard = new Shard(20000, 10000, false);
+
+        Shard firstShard = new Shard(10000 , 10000, false );
+
+        Shard rootShard = new Shard( 0, 0, false );
+
+        ShardEntryGroup shardEntryGroup = new ShardEntryGroup( delta );
+
+        boolean result = shardEntryGroup.addShard( secondShard );
+
+        assertTrue( "Shard added", result );
+
+        result = shardEntryGroup.addShard( rootShard );
+
+        assertTrue( "Shard added", result );
+
+        //this should blow up, we can't add a shard in the middle, it must always be greater than the current max
+
+        shardEntryGroup.addShard( firstShard );
+
+
+    }
+
+
+
+    @Test
+    public void shardEntryAddList() {
+
+        final long delta = 10000;
+
+        Shard highShard = new Shard( 30000, 1000, false );
+
+        Shard midShard = new Shard( 20000, 1000, true );
+
+        Shard lowShard = new Shard( 10000, 1000, false);
+
+        ShardEntryGroup shardEntryGroup = new ShardEntryGroup( delta );
+
+        boolean result = shardEntryGroup.addShard( highShard );
+
+        assertTrue( "Shard added", result );
+
+        result = shardEntryGroup.addShard( midShard );
+
+        assertTrue( "Shard added", result );
+
+        result = shardEntryGroup.addShard( lowShard );
+
+        assertFalse( "Shard added", result );
+    }
+
+
+
+
 }