You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/08/24 23:46:01 UTC

[1/3] usergrid git commit: WIP overwrite.

Repository: usergrid
Updated Branches:
  refs/heads/USERGRID-909 [created] 9f3bf2b3a


WIP overwrite.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/42a4eeec
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/42a4eeec
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/42a4eeec

Branch: refs/heads/USERGRID-909
Commit: 42a4eeecb02ea7cb81c55ff8558f09775c84dcdd
Parents: 35430a5
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Aug 18 16:02:29 2015 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Aug 18 16:02:29 2015 -0700

----------------------------------------------------------------------
 .../usergrid/persistence/graph/GraphFig.java    |  71 ++------
 .../persistence/graph/event/EdgeDeleted.java    |   8 -
 .../impl/shard/EdgeColumnFamilies.java          |  10 +-
 .../impl/shard/EdgeShardStrategy.java           |   8 +-
 .../impl/shard/NodeShardAllocation.java         |   8 +-
 .../impl/shard/NodeShardApproximation.java      |  10 +-
 .../impl/shard/NodeShardCache.java              |   7 +-
 .../impl/shard/ShardEntryGroup.java             |  18 +-
 .../impl/shard/ShardGroupCompaction.java        |   7 +-
 .../shard/impl/NodeShardAllocationImpl.java     |   2 +-
 .../impl/shard/impl/NodeShardCacheImpl.java     | 177 ++-----------------
 .../shard/impl/ShardEntryGroupIterator.java     |   7 +-
 .../shard/impl/ShardGroupCompactionImpl.java    |   3 +-
 .../usergrid/services/AbstractService.java      |   2 +-
 14 files changed, 64 insertions(+), 274 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/42a4eeec/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
index 28aab40..38506f3 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
@@ -42,24 +42,6 @@ public interface GraphFig extends GuicyFig {
      */
     String SHARD_SIZE = "usergrid.graph.shard.size";
 
-
-    /**
-     * Number of shards we can cache.
-     */
-    String SHARD_CACHE_SIZE = "usergrid.graph.shard.cache.size";
-
-
-    /**
-     * Get the cache timeout.  The local cache will exist for this amount of time max (in millis).
-     */
-    String SHARD_CACHE_TIMEOUT = "usergrid.graph.shard.cache.timeout";
-
-    /**
-     * Number of worker threads to refresh the cache
-     */
-    String SHARD_CACHE_REFRESH_WORKERS = "usergrid.graph.shard.refresh.worker.count";
-
-
     /**
      * The size of the worker count for shard auditing
      */
@@ -75,16 +57,6 @@ public interface GraphFig extends GuicyFig {
     String SHARD_REPAIR_CHANCE = "usergrid.graph.shard.repair.chance";
 
 
-    /**
-     * The minimum amount of time than can occur (in millis) between shard allocation and compaction.  Must be at least 2x the cache
-     * timeout. Set to 2.5x the cache timeout to be safe
-     *
-     * Note that you should also pad this for node clock drift.  A good value for this would be 2x the shard cache
-     * timeout + 30 seconds, assuming you have NTP and allow a max drift of 30 seconds
-     */
-    String SHARD_MIN_DELTA = "usergrid.graph.shard.min.delta";
-
-
     String COUNTER_WRITE_FLUSH_COUNT = "usergrid.graph.shard.counter.beginFlush.count";
 
     String COUNTER_WRITE_FLUSH_INTERVAL = "usergrid.graph.shard.counter.beginFlush.interval";
@@ -92,15 +64,13 @@ public interface GraphFig extends GuicyFig {
     String COUNTER_WRITE_FLUSH_QUEUE_SIZE = "usergrid.graph.shard.counter.queue.size";
 
 
-
-
-    @Default("1000")
-    @Key(SCAN_PAGE_SIZE)
+    @Default( "1000" )
+    @Key( SCAN_PAGE_SIZE )
     int getScanPageSize();
 
 
-    @Default("5")
-    @Key(REPAIR_CONCURRENT_SIZE)
+    @Default( "5" )
+    @Key( REPAIR_CONCURRENT_SIZE )
     int getRepairConcurrentSize();
 
 
@@ -108,31 +78,10 @@ public interface GraphFig extends GuicyFig {
     @Key( SHARD_REPAIR_CHANCE )
     double getShardRepairChance();
 
-
     @Default( "500000" )
     @Key( SHARD_SIZE )
     long getShardSize();
 
-
-    @Default("30000")
-    @Key(SHARD_CACHE_TIMEOUT)
-    long getShardCacheTimeout();
-
-    @Default("60000")
-    @Key(SHARD_MIN_DELTA)
-    long getShardMinDelta();
-
-
-    @Default("250000")
-    @Key(SHARD_CACHE_SIZE)
-    long getShardCacheSize();
-
-
-    @Default("2")
-    @Key(SHARD_CACHE_REFRESH_WORKERS)
-    int getShardCacheRefreshWorkerCount();
-
-
     @Default( "10" )
     @Key( SHARD_AUDIT_WORKERS )
     int getShardAuditWorkerCount();
@@ -142,17 +91,17 @@ public interface GraphFig extends GuicyFig {
     int getShardAuditWorkerQueueSize();
 
 
-    @Default("10000")
-    @Key(COUNTER_WRITE_FLUSH_COUNT)
+    @Default( "10000" )
+    @Key( COUNTER_WRITE_FLUSH_COUNT )
     long getCounterFlushCount();
 
 
-    @Default("30000")
-    @Key(COUNTER_WRITE_FLUSH_INTERVAL)
+    @Default( "30000" )
+    @Key( COUNTER_WRITE_FLUSH_INTERVAL )
     long getCounterFlushInterval();
 
-    @Default("1000")
-    @Key(COUNTER_WRITE_FLUSH_QUEUE_SIZE)
+    @Default( "1000" )
+    @Key( COUNTER_WRITE_FLUSH_QUEUE_SIZE )
     int getCounterFlushQueueSize();
 }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/42a4eeec/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/event/EdgeDeleted.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/event/EdgeDeleted.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/event/EdgeDeleted.java
deleted file mode 100644
index 631de59..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/event/EdgeDeleted.java
+++ /dev/null
@@ -1,8 +0,0 @@
-package org.apache.usergrid.persistence.graph.event;
-
-
-/**
- *
- *
- */
-public interface EdgeDeleted {}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/42a4eeec/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeColumnFamilies.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeColumnFamilies.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeColumnFamilies.java
index f1d6e37..4485b0b 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeColumnFamilies.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeColumnFamilies.java
@@ -34,27 +34,27 @@ public interface EdgeColumnFamilies extends Migration{
     /**
      * Get the name of the column family for getting source nodes
      */
-    public MultiTennantColumnFamily<ScopedRowKey<RowKey>, DirectedEdge> getSourceNodeCfName();
+    MultiTennantColumnFamily<ScopedRowKey<RowKey>, DirectedEdge> getSourceNodeCfName();
 
     /**
      * Get the name of the column family for getting target nodes
      */
-    public MultiTennantColumnFamily<ScopedRowKey<RowKey>, DirectedEdge> getTargetNodeCfName();
+    MultiTennantColumnFamily<ScopedRowKey<RowKey>, DirectedEdge> getTargetNodeCfName();
 
 
     /**
      * Get the name of the column family for getting source nodes  with a target type
      */
-    public MultiTennantColumnFamily<ScopedRowKey<RowKeyType>, DirectedEdge> getSourceNodeTargetTypeCfName();
+    MultiTennantColumnFamily<ScopedRowKey<RowKeyType>, DirectedEdge> getSourceNodeTargetTypeCfName();
 
     /**
      * Get the name of the column family for getting target nodes with a source type
      */
-    public MultiTennantColumnFamily<ScopedRowKey<RowKeyType>, DirectedEdge> getTargetNodeSourceTypeCfName();
+    MultiTennantColumnFamily<ScopedRowKey<RowKeyType>, DirectedEdge> getTargetNodeSourceTypeCfName();
 
     /**
      * Get the Graph edge versions cf
      * @return
      */
-    public MultiTennantColumnFamily<ScopedRowKey<EdgeRowKey>, Long> getGraphEdgeVersions();
+    MultiTennantColumnFamily<ScopedRowKey<EdgeRowKey>, Long> getGraphEdgeVersions();
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/42a4eeec/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardStrategy.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardStrategy.java
index 1e02a72..0c05ded 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardStrategy.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardStrategy.java
@@ -33,7 +33,8 @@ public interface EdgeShardStrategy {
      * @param scope The application's scope]
      * @param timestamp The timestamp on the edge
      */
-    public ShardEntryGroup getWriteShards( final ApplicationScope scope, final long timestamp, final DirectedEdgeMeta directedEdgeMeta );
+    ShardEntryGroup getWriteShards( final ApplicationScope scope, final long timestamp,
+                                    final DirectedEdgeMeta directedEdgeMeta );
 
 
     /**
@@ -42,7 +43,8 @@ public interface EdgeShardStrategy {
      * @param scope The application scope
      * @param maxTimestamp The max timestamp to use
      */
-    public Iterator<ShardEntryGroup> getReadShards(final ApplicationScope scope, final long maxTimestamp, final DirectedEdgeMeta directedEdgeMeta );
+    Iterator<ShardEntryGroup> getReadShards( final ApplicationScope scope, final long maxTimestamp,
+                                             final DirectedEdgeMeta directedEdgeMeta );
 
     /**
      * Increment our count meta data by the passed value.  Can be a positive or a negative number.
@@ -52,7 +54,7 @@ public interface EdgeShardStrategy {
      * @param directedEdgeMeta The edge meta data to use
      * @return
      */
-    public void increment(final ApplicationScope scope, Shard shard, long count, final DirectedEdgeMeta directedEdgeMeta );
+    void increment( final ApplicationScope scope, Shard shard, long count, final DirectedEdgeMeta directedEdgeMeta );
 
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/42a4eeec/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocation.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocation.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocation.java
index fcc0fc3..cadd0db 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocation.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocation.java
@@ -42,7 +42,8 @@ public interface NodeShardAllocation {
      * @param directedEdgeMeta The directed edge metadata to use
      * @return A list of all shards <= the current shard.  This will always return 0l if no shards are allocated
      */
-    public Iterator<ShardEntryGroup> getShards( final ApplicationScope scope, Optional<Shard> maxShardId, final DirectedEdgeMeta directedEdgeMeta );
+    Iterator<ShardEntryGroup> getShards( final ApplicationScope scope, Optional<Shard> maxShardId,
+                                         final DirectedEdgeMeta directedEdgeMeta );
 
 
     /**
@@ -53,13 +54,14 @@ public interface NodeShardAllocation {
      * @param directedEdgeMeta The directed edge metadata to use
      * @return True if a new shard was allocated
      */
-    public boolean auditShard(final ApplicationScope scope, final ShardEntryGroup shardEntryGroup, final DirectedEdgeMeta directedEdgeMeta);
+    boolean auditShard( final ApplicationScope scope, final ShardEntryGroup shardEntryGroup,
+                        final DirectedEdgeMeta directedEdgeMeta );
 
     /**
      * Get the minimum time that a created shard should be considered "new", and be used for both new writes and reads
      * @return
      */
-    public long getMinTime();
+    long getMinTime();
 
 
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/42a4eeec/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardApproximation.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardApproximation.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardApproximation.java
index fc39e56..078fa1b 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardApproximation.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardApproximation.java
@@ -37,8 +37,8 @@ public interface NodeShardApproximation {
      * @param count The count to increment
      * @param directedEdgeMeta The directed edge meta data to use
      */
-    public void increment( final ApplicationScope scope, final Shard shard,
-                           final long count, final DirectedEdgeMeta directedEdgeMeta );
+    void increment( final ApplicationScope scope, final Shard shard, final long count,
+                    final DirectedEdgeMeta directedEdgeMeta );
 
 
     /**
@@ -47,20 +47,20 @@ public interface NodeShardApproximation {
      * @param scope The scope
      * @param directedEdgeMeta The directed edge meta data to use
      */
-    public long getCount( final ApplicationScope scope, final Shard shard,  final DirectedEdgeMeta directedEdgeMeta );
+    long getCount( final ApplicationScope scope, final Shard shard, final DirectedEdgeMeta directedEdgeMeta );
 
 
     /**
      * Flush the current counters in the Approximation.  Will return immediately after the flush. You can then use flushPending
      * to check the state.
      */
-    public void beginFlush();
+    void beginFlush();
 
     /**
      * Return true if there is data to be flushed
      * @return
      */
-    public boolean flushPending();
+    boolean flushPending();
 
 
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/42a4eeec/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCache.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCache.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCache.java
index 173b89d..2f13a4d 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCache.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCache.java
@@ -38,8 +38,8 @@ public interface NodeShardCache {
      * @param timestamp The time to select the slice for.
      * @param directedEdgeMeta The directed edge meta data
      */
-    public ShardEntryGroup getWriteShardGroup( final ApplicationScope scope,
-                                               final long timestamp, final DirectedEdgeMeta directedEdgeMeta );
+    ShardEntryGroup getWriteShardGroup( final ApplicationScope scope, final long timestamp,
+                                        final DirectedEdgeMeta directedEdgeMeta );
 
     /**
      * Get an iterator of all versions <= the version for iterating shard entry sets.  The iterator of groups will be ordered
@@ -49,6 +49,7 @@ public interface NodeShardCache {
      * @param directedEdgeMeta The directed edge meta data
      * @return
      */
-    public Iterator<ShardEntryGroup> getReadShardGroup( final ApplicationScope scope, final long maxTimestamp, final DirectedEdgeMeta directedEdgeMeta  );
+    Iterator<ShardEntryGroup> getReadShardGroup( final ApplicationScope scope, final long maxTimestamp,
+                                                 final DirectedEdgeMeta directedEdgeMeta );
 
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/42a4eeec/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
index 11bf7a4..90f703d 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
@@ -45,8 +45,6 @@ public class ShardEntryGroup {
 
     private List<Shard> shards;
 
-    private final long delta;
-
     private long maxCreatedTime;
 
     private Shard compactionTarget;
@@ -57,9 +55,8 @@ public class ShardEntryGroup {
     /**
      * The max delta we accept in milliseconds for create time to be considered a member of this group
      */
-    public ShardEntryGroup( final long delta ) {
-        Preconditions.checkArgument( delta > 0, "delta must be greater than 0" );
-        this.delta = delta;
+    public ShardEntryGroup( ) {
+
         this.shards = new ArrayList<>();
         this.maxCreatedTime = 0;
     }
@@ -283,15 +280,7 @@ public class ShardEntryGroup {
         /**
          * We don't have enough shards to compact, ignore
          */
-        return getCompactionTarget() != null
-
-
-                /**
-                 * If something was created within the delta time frame, not everyone may have seen it due to
-                 * cache refresh, we can't compact yet.
-                 */
-
-                && currentTime - delta > maxCreatedTime;
+        return getCompactionTarget() != null;
     }
 
 
@@ -316,7 +305,6 @@ public class ShardEntryGroup {
     public String toString() {
         return "ShardEntryGroup{" +
                 "shards=" + shards +
-                ", delta=" + delta +
                 ", maxCreatedTime=" + maxCreatedTime +
                 ", compactionTarget=" + compactionTarget +
                 '}';

http://git-wip-us.apache.org/repos/asf/usergrid/blob/42a4eeec/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
index 4fe1a63..f54bd55 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
@@ -42,12 +42,11 @@ public interface ShardGroupCompaction {
      *
      * @return A ListenableFuture with the result.  Note that some
      */
-    public ListenableFuture<AuditResult> evaluateShardGroup( final ApplicationScope scope,
-                                                             final DirectedEdgeMeta edgeMeta,
-                                                             final ShardEntryGroup group );
+    ListenableFuture<AuditResult> evaluateShardGroup( final ApplicationScope scope, final DirectedEdgeMeta edgeMeta,
+                                                      final ShardEntryGroup group );
 
 
-    public enum AuditResult {
+    enum AuditResult {
         /**
          * We didn't check this shard
          */

http://git-wip-us.apache.org/repos/asf/usergrid/blob/42a4eeec/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
index 7a7fb3f..80c6a5f 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
@@ -120,7 +120,7 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
             existingShards = Collections.singleton( MIN_SHARD ).iterator();
         }
 
-        return new ShardEntryGroupIterator( existingShards, graphFig.getShardMinDelta(), shardGroupCompaction, scope,
+        return new ShardEntryGroupIterator( existingShards, shardGroupCompaction, scope,
                 directedEdgeMeta );
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/42a4eeec/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
index c5f0bfb..80375a8 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
@@ -19,25 +19,14 @@
 package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
 
 
-import java.beans.PropertyChangeEvent;
-import java.beans.PropertyChangeListener;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.TreeMap;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.usergrid.persistence.core.consistency.TimeService;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.util.ValidationUtils;
 import org.apache.usergrid.persistence.graph.GraphFig;
@@ -52,78 +41,28 @@ import org.apache.usergrid.persistence.graph.serialization.util.IterableUtil;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.cache.Weigher;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListenableFutureTask;
-import com.google.common.util.concurrent.ListeningScheduledExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
 import com.google.inject.Inject;
 
 
-
 /**
  * Simple implementation of the shard.  Uses a local Guava shard with a timeout.  If a value is not present in the
  * shard, it will need to be searched via cassandra.
  */
 public class NodeShardCacheImpl implements NodeShardCache {
 
-    private static final Logger LOG = LoggerFactory.getLogger( NodeShardCacheImpl.class );
-
-    /**
-     * Only cache shards that have < 10k groups.  This is an arbitrary amount, and may change with profiling and
-     * testing
-     */
-    private static final int MAX_WEIGHT_PER_ELEMENT = 10000;
-
-
     private final NodeShardAllocation nodeShardAllocation;
-    private final GraphFig graphFig;
-
-
-
-    private ListeningScheduledExecutorService refreshExecutors;
-    private LoadingCache<CacheKey, CacheEntry> graphs;
 
 
     /**
      *  @param nodeShardAllocation
-     * @param graphFig
      */
     @Inject
-    public NodeShardCacheImpl( final NodeShardAllocation nodeShardAllocation, final GraphFig graphFig) {
-
+    public NodeShardCacheImpl( final NodeShardAllocation nodeShardAllocation ) {
         Preconditions.checkNotNull( nodeShardAllocation, "nodeShardAllocation is required" );
-        Preconditions.checkNotNull( graphFig, "consistencyFig is required" );
 
         this.nodeShardAllocation = nodeShardAllocation;
-        this.graphFig = graphFig;
-
-
-        /**
-         * Add our listener to reconstruct the shard
-         */
-        this.graphFig.addPropertyChangeListener( new PropertyChangeListener() {
-            @Override
-            public void propertyChange( final PropertyChangeEvent evt ) {
-                final String propertyName = evt.getPropertyName();
 
-                if ( propertyName.equals( GraphFig.SHARD_CACHE_SIZE ) || propertyName
-                        .equals( GraphFig.SHARD_CACHE_TIMEOUT ) || propertyName
-                        .equals( GraphFig.SHARD_CACHE_REFRESH_WORKERS ) ) {
 
-
-                    updateCache();
-                }
-            }
-        } );
-
-        /**
-         * Initialize the shard cache
-         */
-        updateCache();
     }
 
 
@@ -137,12 +76,8 @@ public class NodeShardCacheImpl implements NodeShardCache {
         final CacheKey key = new CacheKey( scope, directedEdgeMeta );
         CacheEntry entry;
 
-        try {
-            entry = this.graphs.get( key );
-        }
-        catch ( ExecutionException e ) {
-            throw new GraphRuntimeException( "Unable to load shard key for graph", e );
-        }
+        entry = getShards( key );
+
 
         final ShardEntryGroup shardId = entry.getShardId( timestamp );
 
@@ -164,12 +99,7 @@ public class NodeShardCacheImpl implements NodeShardCache {
         final CacheKey key = new CacheKey( scope, directedEdgeMeta );
         CacheEntry entry;
 
-        try {
-            entry = this.graphs.get( key );
-        }
-        catch ( ExecutionException e ) {
-            throw new GraphRuntimeException( "Unable to load shard key for graph", e );
-        }
+        entry = getShards( key );
 
         Iterator<ShardEntryGroup> iterator = entry.getShards( maxTimestamp );
 
@@ -182,33 +112,6 @@ public class NodeShardCacheImpl implements NodeShardCache {
 
 
     /**
-     * This is a race condition.  We could re-init the shard while another thread is reading it.  This is fine, the read
-     * doesn't have to be precise.  The algorithm accounts for stale data.
-     */
-    private void updateCache() {
-        if ( this.refreshExecutors != null ) {
-            this.refreshExecutors.shutdown();
-        }
-
-        this.refreshExecutors = MoreExecutors
-                .listeningDecorator( Executors.newScheduledThreadPool( graphFig.getShardCacheRefreshWorkerCount() ) );
-
-
-        this.graphs = CacheBuilder.newBuilder()
-
-                //we want to asynchronously load new values for existing ones, that way we wont' have to
-                //wait for a trip to cassandra
-                .refreshAfterWrite( graphFig.getShardCacheTimeout(), TimeUnit.MILLISECONDS )
-
-                        //set our weight function, since not all shards are equal
-                .maximumWeight(MAX_WEIGHT_PER_ELEMENT * graphFig.getShardCacheSize() ).weigher( new ShardWeigher() )
-
-                        //set our shard loader
-                .build( new ShardCacheLoader() );
-    }
-
-
-    /**
      * Cache key for looking up items in the shard
      */
     private static class CacheKey {
@@ -255,6 +158,16 @@ public class NodeShardCacheImpl implements NodeShardCache {
     }
 
 
+    private CacheEntry getShards( final CacheKey key ) {
+        final Iterator<ShardEntryGroup> edges =
+            nodeShardAllocation.getShards( key.scope, Optional.<Shard>absent(), key.directedEdgeMeta );
+
+        final CacheEntry cacheEntry = new CacheEntry( edges );
+
+        return cacheEntry;
+    }
+
+
     /**
      * An entry for the shard.
      */
@@ -266,8 +179,8 @@ public class NodeShardCacheImpl implements NodeShardCache {
 
 
         private CacheEntry( final Iterator<ShardEntryGroup> shards ) {
-            Preconditions.checkArgument( shards.hasNext(),
-                    "More than 1 entry must be present in the shard to load into cache" );
+            Preconditions
+                .checkArgument( shards.hasNext(), "More than 1 entry must be present in the shard to load into cache" );
 
             this.shards = new TreeMap<>();
             /**
@@ -281,21 +194,14 @@ public class NodeShardCacheImpl implements NodeShardCache {
 
 
         /**
-         * Return the size of the elements in the cache
-         */
-        public int getCacheSize() {
-            return this.shards.size();
-        }
-
-
-        /**
          * Get all shards <= this one in descending order
          */
         public Iterator<ShardEntryGroup> getShards( final Long maxShard ) {
 
             final Long firstKey = shards.floorKey( maxShard );
 
-            return Collections.unmodifiableCollection( shards.headMap( firstKey, true ).descendingMap().values()).iterator();
+            return Collections.unmodifiableCollection( shards.headMap( firstKey, true ).descendingMap().values() )
+                              .iterator();
         }
 
 
@@ -313,51 +219,4 @@ public class NodeShardCacheImpl implements NodeShardCache {
         }
     }
 
-
-    /**
-     * Load the cache entries from the shards we have stored
-     */
-    final class ShardCacheLoader extends CacheLoader<CacheKey, CacheEntry> {
-
-
-        @Override
-        public CacheEntry load( final CacheKey key ) {
-
-
-            final Iterator<ShardEntryGroup> edges =
-                    nodeShardAllocation.getShards( key.scope, Optional.<Shard>absent(), key.directedEdgeMeta );
-
-            final CacheEntry cacheEntry = new CacheEntry( edges );
-
-            return cacheEntry;
-        }
-
-
-        @Override
-        public ListenableFuture<CacheEntry> reload( final CacheKey key, final CacheEntry oldValue ) throws Exception {
-            ListenableFutureTask<CacheEntry> task = ListenableFutureTask.create( new Callable<CacheEntry>() {
-                public CacheEntry call() {
-                    return load( key );
-                }
-            } );
-            //load via the refresh executor
-            refreshExecutors.execute( task );
-            return task;
-        }
-
-        //TODO, use RX for sliding window buffering and duplicate removal
-    }
-
-
-
-    /**
-     * Calculates the weight of the entry by geting the size of the cache
-     */
-    final class ShardWeigher implements Weigher<CacheKey, CacheEntry> {
-
-        @Override
-        public int weigh( final CacheKey key, final CacheEntry value ) {
-            return value.getCacheSize();
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/42a4eeec/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java
index f1b5108..cbe35b6 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java
@@ -26,7 +26,6 @@ public class ShardEntryGroupIterator implements Iterator<ShardEntryGroup> {
 
     private final ShardGroupCompaction shardGroupCompaction;
     private final PushbackIterator<Shard> sourceIterator;
-    private final long minDelta;
     private final ApplicationScope scope;
     private final DirectedEdgeMeta directedEdgeMeta;
 
@@ -39,9 +38,8 @@ public class ShardEntryGroupIterator implements Iterator<ShardEntryGroup> {
      *
      * @param shardIterator The iterator of all shards.  Order is expected to be by the  shard index from Long.MAX to
      * Long.MIN
-     * @param minDelta The minimum delta we allow to consider shards the same group
      */
-    public ShardEntryGroupIterator( final Iterator<Shard> shardIterator, final long minDelta,
+    public ShardEntryGroupIterator( final Iterator<Shard> shardIterator,
                                     final ShardGroupCompaction shardGroupCompaction, final ApplicationScope scope,
                                     final DirectedEdgeMeta directedEdgeMeta ) {
 
@@ -51,7 +49,6 @@ public class ShardEntryGroupIterator implements Iterator<ShardEntryGroup> {
         this.directedEdgeMeta = directedEdgeMeta;
         this.sourceIterator = new PushbackIterator( shardIterator );
         this.shardGroupCompaction = shardGroupCompaction;
-        this.minDelta = minDelta;
     }
 
 
@@ -98,7 +95,7 @@ public class ShardEntryGroupIterator implements Iterator<ShardEntryGroup> {
         while ( sourceIterator.hasNext() ) {
 
             if ( next == null ) {
-                next = new ShardEntryGroup( minDelta );
+                next = new ShardEntryGroup( );
             }
 
             final Shard shard = sourceIterator.next();

http://git-wip-us.apache.org/repos/asf/usergrid/blob/42a4eeec/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
index 2194400..7c1c816 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
@@ -152,6 +152,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
         if(LOG.isDebugEnabled()) {
             LOG.debug("Compacting shard group. count is {} ", countAudits.get());
         }
+
         final CompactionResult.CompactionBuilder resultBuilder = CompactionResult.builder();
 
         final Shard targetShard = group.getCompactionTarget();
@@ -305,7 +306,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
         }
 
         countAudits.getAndIncrement();
-        
+
         if(LOG.isDebugEnabled()) {
             LOG.debug("Auditing shard group. count is {} ", countAudits.get());
         }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/42a4eeec/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java b/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
index 95a651d..c05dea5 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
@@ -358,7 +358,7 @@ public abstract class AbstractService implements Service {
     }
 
 
-    public Entity importEntity( ServiceContext context, Entity entity ) throws Exception {
+    public Entity  importEntity( ServiceContext context, Entity entity ) throws Exception {
         return importEntity( context.getRequest(), entity );
     }
 


[3/3] usergrid git commit: Updated tests and finished basic check + allocate algorithm

Posted by to...@apache.org.
Updated tests and finished basic check + allocate algorithm


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/9f3bf2b3
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/9f3bf2b3
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/9f3bf2b3

Branch: refs/heads/USERGRID-909
Commit: 9f3bf2b3a77777f67abb540b636b614fed851b94
Parents: 42a4eee
Author: Todd Nine <tn...@apigee.com>
Authored: Mon Aug 24 15:44:33 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon Aug 24 15:44:33 2015 -0600

----------------------------------------------------------------------
 .../core/astyanax/CassandraConfig.java          |    1 +
 .../usergrid/persistence/graph/GraphFig.java    |   24 +
 .../persistence/graph/guice/GraphModule.java    |    6 +-
 .../impl/shard/EdgeShardSerialization.java      |   16 +-
 .../impl/shard/NodeShardAllocation.java         |    6 -
 .../impl/shard/ShardConsistency.java            |   46 +
 .../impl/shard/ShardEntryGroup.java             |   57 +-
 .../shard/ShardedEdgeSerializationImpl.java     | 1008 ++++++++++++++++++
 .../shard/impl/EdgeShardSerializationImpl.java  |   71 +-
 .../shard/impl/NodeShardAllocationImpl.java     |  109 +-
 .../impl/shard/impl/ShardConsistencyImpl.java   |   58 +
 .../impl/ShardedEdgeSerializationImpl.java      | 1006 -----------------
 .../graph/GraphManagerShardConsistencyIT.java   |   16 +-
 .../impl/shard/EdgeShardSerializationTest.java  |   14 +-
 .../impl/shard/NodeShardAllocationTest.java     |  142 +--
 .../impl/shard/NodeShardCacheTest.java          |   14 +-
 .../impl/shard/ShardEntryGroupTest.java         |   62 +-
 .../impl/shard/ShardGroupCompactionTest.java    |    2 +-
 .../shard/count/NodeShardApproximationTest.java |   37 +-
 .../shard/impl/ShardEntryGroupIteratorTest.java |    8 +-
 .../SearchRequestBuilderStrategyV2.java         |    2 +-
 .../index/impl/EsEntityIndexImpl.java           |    2 +-
 22 files changed, 1361 insertions(+), 1346 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/9f3bf2b3/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfig.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfig.java
index 817aee2..bbefe0f 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfig.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfig.java
@@ -55,4 +55,5 @@ public interface CassandraConfig {
     public int[] getShardSettings();
 
 
+
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9f3bf2b3/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
index 38506f3..d0df2eb 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
@@ -54,6 +54,12 @@ public interface GraphFig extends GuicyFig {
     String SHARD_AUDIT_WORKERS = "usergrid.graph.shard.audit.worker.count";
 
 
+    String SHARD_WRITE_CONSISTENCY = "usergrid.graph.shard.write.consistency";
+
+    String SHARD_READ_CONSISTENCY = "usergrid.graph.shard.read.consistency";
+
+    String SHARD_AUDIT_CONSISTENCY = "usergrid.graph.shard.audit.consistency";
+
     String SHARD_REPAIR_CHANCE = "usergrid.graph.shard.repair.chance";
 
 
@@ -103,5 +109,23 @@ public interface GraphFig extends GuicyFig {
     @Default( "1000" )
     @Key( COUNTER_WRITE_FLUSH_QUEUE_SIZE )
     int getCounterFlushQueueSize();
+
+    @Default( "CL_EACH_QUORUM" )
+    @Key( SHARD_WRITE_CONSISTENCY )
+    String getShardWriteConsistency();
+
+    /**
+     * Get the consistency level for doing reads
+     */
+    @Default( "CL_LOCAL_QUORUM" )
+    @Key( SHARD_READ_CONSISTENCY )
+    String getShardReadConsistency();
+
+    /**
+     * Get the consistency level for performing a shard audit
+     */
+    @Default( "CL_EACH_QUORUM" )
+    @Key( SHARD_AUDIT_CONSISTENCY )
+    String getShardAuditConsistency();
 }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9f3bf2b3/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
index 4b628d1..1cca5b2 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
@@ -61,6 +61,7 @@ import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardS
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardAllocation;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardApproximation;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardCache;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardConsistency;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardGroupCompaction;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardedEdgeSerialization;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.count.NodeShardApproximationImpl;
@@ -69,8 +70,9 @@ import org.apache.usergrid.persistence.graph.serialization.impl.shard.count.Node
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.EdgeShardSerializationImpl;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.NodeShardAllocationImpl;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.NodeShardCacheImpl;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardConsistencyImpl;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardGroupCompactionImpl;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardedEdgeSerializationImpl;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardedEdgeSerializationImpl;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.SizebasedEdgeColumnFamilies;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.SizebasedEdgeShardStrategy;
 
@@ -114,6 +116,8 @@ public abstract class GraphModule extends AbstractModule {
         bind( NodeShardApproximation.class ).to( NodeShardApproximationImpl.class );
         bind( NodeShardCache.class ).to( NodeShardCacheImpl.class );
         bind( NodeShardCounterSerialization.class ).to( NodeShardCounterSerializationImpl.class );
+        bind( ShardConsistency.class).to( ShardConsistencyImpl.class);
+
 
         /**
          * Bind our strategies based on their internal annotations.

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9f3bf2b3/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerialization.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerialization.java
index 93fd685..d8c561f 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerialization.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerialization.java
@@ -43,13 +43,25 @@ public interface EdgeShardSerialization extends Migration{
     public MutationBatch writeShardMeta( ApplicationScope scope, Shard shard, DirectedEdgeMeta directedEdgeMeta );
 
     /**
-     * Get an iterator of all meta data and types.  Returns a range from High to low
+     * Get an iterator of all meta data and types.  Returns a range from High to low.  Only reads the local region
      * @param scope The organization scope
      * @param start The shard time to start seeking from.  Values <= this value will be returned.
      * @param directedEdgeMeta The edge meta data to use
      * @return
      */
-    public Iterator<Shard> getShardMetaData( ApplicationScope scope, Optional<Shard> start,  DirectedEdgeMeta directedEdgeMeta);
+    public Iterator<Shard> getShardMetaDataLocal( ApplicationScope scope, Optional<Shard> start,
+                                                  DirectedEdgeMeta directedEdgeMeta );
+
+
+    /**
+     * Get an iterator of all meta data and types.  Returns a range from High to low.  Reads quorum of all regions
+     * @param scope The organization scope
+     * @param start The shard time to start seeking from.  Values <= this value will be returned.
+     * @param directedEdgeMeta The edge meta data to use
+     * @return
+     */
+    Iterator<Shard> getShardMetaDataAudit( ApplicationScope scope, Optional<Shard> start,
+                                           DirectedEdgeMeta directedEdgeMeta );
 
     /**
      * Remove the shard from the edge meta data from the types.

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9f3bf2b3/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocation.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocation.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocation.java
index cadd0db..5039b35 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocation.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocation.java
@@ -57,11 +57,5 @@ public interface NodeShardAllocation {
     boolean auditShard( final ApplicationScope scope, final ShardEntryGroup shardEntryGroup,
                         final DirectedEdgeMeta directedEdgeMeta );
 
-    /**
-     * Get the minimum time that a created shard should be considered "new", and be used for both new writes and reads
-     * @return
-     */
-    long getMinTime();
-
 
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9f3bf2b3/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardConsistency.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardConsistency.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardConsistency.java
new file mode 100644
index 0000000..5c52af6
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardConsistency.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard;
+
+
+import com.netflix.astyanax.model.ConsistencyLevel;
+
+
+/**
+ * Interface for shard consistency levels
+ */
+public interface ShardConsistency {
+
+    /**
+     * Get the consistency level for wiring new shards
+     * @return
+     */
+    ConsistencyLevel getShardWriteConsistency();
+
+    /**
+     * Get the consistency level for doing reads
+     * @return
+     */
+    ConsistencyLevel getShardReadConsistency();
+
+    /**
+     * Get the consistency level for performing a shard audit
+     * @return
+     */
+    ConsistencyLevel getShardAuditConsistency();
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9f3bf2b3/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
index 90f703d..f1bc42f 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
@@ -25,7 +25,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,8 +54,7 @@ public class ShardEntryGroup {
     /**
      * The max delta we accept in milliseconds for create time to be considered a member of this group
      */
-    public ShardEntryGroup( ) {
-
+    public ShardEntryGroup() {
         this.shards = new ArrayList<>();
         this.maxCreatedTime = 0;
     }
@@ -95,8 +93,6 @@ public class ShardEntryGroup {
         }
 
 
-
-
         return false;
     }
 
@@ -129,6 +125,22 @@ public class ShardEntryGroup {
 
 
     /**
+     * Get the max shard based on time indexes
+     *
+     * @return
+     */
+    public Shard getMaxShard() {
+        final int size = shards.size();
+
+        if ( size < 1 ) {
+            return null;
+        }
+
+        return shards.get( size - 1 );
+    }
+
+
+    /**
      * Get the entries that we should read from.
      */
     public Collection<Shard> getReadShards() {
@@ -138,20 +150,17 @@ public class ShardEntryGroup {
         final Shard compactionTarget = getCompactionTarget();
 
 
-
-        if(compactionTarget != null){
+        if ( compactionTarget != null ) {
             LOG.debug( "Returning shards {} and {} as read shards", compactionTarget, staticShard );
             return Arrays.asList( compactionTarget, staticShard );
         }
 
 
         LOG.debug( "Returning shards {} read shard", staticShard );
-        return  Collections.singleton( staticShard );
+        return Collections.singleton( staticShard );
     }
 
 
-
-
     /**
      * Get the entries, with the max shard time being first. We write to all shards until they're migrated
      */
@@ -165,19 +174,17 @@ public class ShardEntryGroup {
 
             final Shard compactionTarget = getCompactionTarget();
 
-            LOG.debug( "Returning shard {} as write shard", compactionTarget);
-
-            return Collections.singleton( compactionTarget  );
+            LOG.debug( "Returning shard {} as write shard", compactionTarget );
 
+            return Collections.singleton( compactionTarget );
         }
 
         final Shard staticShard = getRootShard();
 
 
-        LOG.debug( "Returning shard {} as write shard", staticShard);
+        LOG.debug( "Returning shard {} as write shard", staticShard );
 
         return Collections.singleton( staticShard );
-
     }
 
 
@@ -191,22 +198,22 @@ public class ShardEntryGroup {
 
     /**
      * Get the root shard that was created in this group
-     * @return
      */
-    private Shard getRootShard(){
-        if(rootShard != null){
+    private Shard getRootShard() {
+        if ( rootShard != null ) {
             return rootShard;
         }
 
-        final Shard rootCandidate = shards.get( shards.size() -1 );
+        final Shard rootCandidate = shards.get( shards.size() - 1 );
 
-        if(rootCandidate.isCompacted()){
+        if ( rootCandidate.isCompacted() ) {
             rootShard = rootCandidate;
         }
 
         return rootShard;
     }
 
+
     /**
      * Get the shard all compactions should write to.  Null indicates we cannot find a shard that could be used as a
      * compaction target.  Note that this shard may not have surpassed the delta yet You should invoke "shouldCompact"
@@ -297,16 +304,16 @@ public class ShardEntryGroup {
 
 
         return !shard.isCompacted() && ( compactionTarget != null && compactionTarget.getShardIndex() != shard
-                .getShardIndex() );
+            .getShardIndex() );
     }
 
 
     @Override
     public String toString() {
         return "ShardEntryGroup{" +
-                "shards=" + shards +
-                ", maxCreatedTime=" + maxCreatedTime +
-                ", compactionTarget=" + compactionTarget +
-                '}';
+            "shards=" + shards +
+            ", maxCreatedTime=" + maxCreatedTime +
+            ", compactionTarget=" + compactionTarget +
+            '}';
     }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9f3bf2b3/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardedEdgeSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardedEdgeSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardedEdgeSerializationImpl.java
new file mode 100644
index 0000000..3329ff1
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardedEdgeSerializationImpl.java
@@ -0,0 +1,1008 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+package org.apache.usergrid.persistence.graph.serialization.impl.shard;
+
+
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.UUID;
+
+import javax.annotation.Nullable;
+import javax.inject.Inject;
+
+import org.apache.usergrid.persistence.core.astyanax.CassandraConfig;
+import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
+import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
+import org.apache.usergrid.persistence.core.consistency.TimeService;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.util.ValidationUtils;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphFig;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.SearchByEdge;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
+import org.apache.usergrid.persistence.graph.SearchByIdType;
+import org.apache.usergrid.persistence.graph.impl.SimpleMarkedEdge;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdge;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeColumnFamilies;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeRowKey;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardStrategy;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.RowKey;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.RowKeyType;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardedEdgeSerialization;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.EdgeSearcher;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardsColumnIterator;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.comparators.DescendingTimestampComparator;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.comparators.OrderedComparator;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.comparators
+        .SourceDirectedEdgeDescendingComparator;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.comparators
+        .TargetDirectedEdgeDescendingComparator;
+import org.apache.usergrid.persistence.graph.serialization.util.GraphValidation;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.inject.Singleton;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.Serializer;
+import com.netflix.astyanax.util.RangeBuilder;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+
+/**
+ * TODO: Rafactor this to use shards only, no shard groups, just collections of shards.  The parent caller can aggregate
+ * the results of multiple groups together, this has an impedance mismatch in the API layer.
+ */
+@Singleton
+public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
+
+    protected final Keyspace keyspace;
+    protected final CassandraConfig cassandraConfig;
+    protected final GraphFig graphFig;
+    protected final EdgeShardStrategy writeEdgeShardStrategy;
+    protected final TimeService timeService;
+
+
+    @Inject
+    public ShardedEdgeSerializationImpl( final Keyspace keyspace, final CassandraConfig cassandraConfig,
+                                         final GraphFig graphFig, final EdgeShardStrategy writeEdgeShardStrategy,
+                                         final TimeService timeService ) {
+
+
+        checkNotNull( "keyspace required", keyspace );
+        checkNotNull( "cassandraConfig required", cassandraConfig );
+        checkNotNull( "consistencyFig required", graphFig );
+        checkNotNull( "writeEdgeShardStrategy required", writeEdgeShardStrategy );
+        checkNotNull( "timeService required", timeService );
+
+
+        this.keyspace = keyspace;
+        this.cassandraConfig = cassandraConfig;
+        this.graphFig = graphFig;
+        this.writeEdgeShardStrategy = writeEdgeShardStrategy;
+        this.timeService = timeService;
+    }
+
+
+    @Override
+    public MutationBatch writeEdgeFromSource( final EdgeColumnFamilies columnFamilies, final ApplicationScope scope,
+                                              final MarkedEdge markedEdge, final Collection<Shard> shards,
+                                              final DirectedEdgeMeta directedEdgeMeta, final UUID timestamp ) {
+        ValidationUtils.validateApplicationScope( scope );
+        GraphValidation.validateEdge( markedEdge );
+        ValidationUtils.verifyTimeUuid( timestamp, "timestamp" );
+
+        return new SourceWriteOp( columnFamilies, markedEdge ) {
+
+            @Override
+            void writeEdge( final MutationBatch batch,
+                            final MultiTennantColumnFamily<ScopedRowKey<RowKey>, DirectedEdge> columnFamily,
+                            final ApplicationScope scope, final RowKey rowKey, final DirectedEdge edge,
+                            final Shard shard, final boolean isDeleted ) {
+
+                batch.withRow( columnFamily, ScopedRowKey.fromKey( scope.getApplication(), rowKey ) ).putColumn( edge, isDeleted );
+
+                if ( !isDeleted ) {
+                    writeEdgeShardStrategy.increment( scope, shard, 1, directedEdgeMeta );
+                }
+            }
+        }.createBatch( scope, shards, timestamp );
+    }
+
+
+    @Override
+    public MutationBatch writeEdgeFromSourceWithTargetType( final EdgeColumnFamilies columnFamilies,
+                                                            final ApplicationScope scope, final MarkedEdge markedEdge,
+                                                            final Collection<Shard> shards,
+                                                            final DirectedEdgeMeta directedEdgeMeta,
+                                                            final UUID timestamp ) {
+        ValidationUtils.validateApplicationScope( scope );
+        GraphValidation.validateEdge( markedEdge );
+        ValidationUtils.verifyTimeUuid( timestamp, "timestamp" );
+
+
+        return new SourceTargetTypeWriteOp( columnFamilies, markedEdge ) {
+
+            @Override
+            void writeEdge( final MutationBatch batch,
+                            final MultiTennantColumnFamily<ScopedRowKey<RowKeyType>, DirectedEdge> columnFamily,
+                            final ApplicationScope scope, final RowKeyType rowKey, final DirectedEdge edge,
+                            final Shard shard, final boolean isDeleted ) {
+
+
+                batch.withRow( columnFamily, ScopedRowKey.fromKey( scope.getApplication(), rowKey ) ).putColumn( edge, isDeleted );
+
+
+                if ( !isDeleted ) {
+                    writeEdgeShardStrategy.increment( scope, shard, 1, directedEdgeMeta );
+                }
+            }
+        }.createBatch( scope, shards, timestamp );
+    }
+
+
+    @Override
+    public MutationBatch writeEdgeToTarget( final EdgeColumnFamilies columnFamilies, final ApplicationScope scope,
+                                            final MarkedEdge markedEdge, final Collection<Shard> shards,
+                                            final DirectedEdgeMeta targetEdgeMeta, final UUID timestamp ) {
+        ValidationUtils.validateApplicationScope( scope );
+        GraphValidation.validateEdge( markedEdge );
+        ValidationUtils.verifyTimeUuid( timestamp, "timestamp" );
+
+
+        return new TargetWriteOp( columnFamilies, markedEdge ) {
+
+            @Override
+            void writeEdge( final MutationBatch batch,
+                            final MultiTennantColumnFamily<ScopedRowKey<RowKey>, DirectedEdge> columnFamily,
+                            final ApplicationScope scope, final RowKey rowKey, final DirectedEdge edge,
+                            final Shard shard, final boolean isDeleted ) {
+
+                batch.withRow( columnFamily, ScopedRowKey.fromKey( scope.getApplication(), rowKey ) ).putColumn( edge, isDeleted );
+
+
+                if ( !isDeleted ) {
+                    writeEdgeShardStrategy.increment( scope, shard, 1, targetEdgeMeta );
+                }
+            }
+        }.createBatch( scope, shards, timestamp );
+    }
+
+
+    @Override
+    public MutationBatch writeEdgeToTargetWithSourceType( final EdgeColumnFamilies columnFamilies,
+                                                          final ApplicationScope scope, final MarkedEdge markedEdge,
+                                                          final Collection<Shard> shards,
+                                                          final DirectedEdgeMeta directedEdgeMeta,
+                                                          final UUID timestamp ) {
+        ValidationUtils.validateApplicationScope( scope );
+        GraphValidation.validateEdge( markedEdge );
+        ValidationUtils.verifyTimeUuid( timestamp, "timestamp" );
+
+
+        return new TargetSourceTypeWriteOp( columnFamilies, markedEdge ) {
+
+            @Override
+            void writeEdge( final MutationBatch batch,
+                            final MultiTennantColumnFamily<ScopedRowKey<RowKeyType>, DirectedEdge> columnFamily,
+                            final ApplicationScope scope, final RowKeyType rowKey, final DirectedEdge edge,
+                            final Shard shard, final boolean isDeleted ) {
+
+                batch.withRow( columnFamilies.getTargetNodeSourceTypeCfName(), ScopedRowKey.fromKey( scope.getApplication(), rowKey ) )
+                     .putColumn( edge, isDeleted );
+
+
+                if ( !isDeleted ) {
+                    writeEdgeShardStrategy.increment( scope, shard, 1, directedEdgeMeta );
+                }
+            }
+        }.createBatch( scope, shards, timestamp );
+    }
+
+
+    @Override
+    public MutationBatch writeEdgeVersions( final EdgeColumnFamilies columnFamilies, final ApplicationScope scope,
+                                            final MarkedEdge markedEdge, final Collection<Shard> shards,
+                                            final DirectedEdgeMeta directedEdgeMeta, final UUID timestamp ) {
+
+        ValidationUtils.validateApplicationScope( scope );
+        GraphValidation.validateEdge( markedEdge );
+        ValidationUtils.verifyTimeUuid( timestamp, "timestamp" );
+
+
+        return new EdgeVersions( columnFamilies, markedEdge ) {
+
+            @Override
+            void writeEdge( final MutationBatch batch,
+                            final MultiTennantColumnFamily<ScopedRowKey<EdgeRowKey>, Long> columnFamily,
+                            final ApplicationScope scope, final EdgeRowKey rowKey, final Long column, final Shard shard,
+                            final boolean isDeleted ) {
+                batch.withRow( columnFamilies.getGraphEdgeVersions(), ScopedRowKey.fromKey( scope.getApplication(), rowKey ) )
+                     .putColumn( column, isDeleted );
+
+
+                if ( !isDeleted ) {
+                    writeEdgeShardStrategy.increment( scope, shard, 1, directedEdgeMeta );
+                }
+            }
+        }.createBatch( scope, shards, timestamp );
+    }
+
+
+    @Override
+    public MutationBatch deleteEdgeFromSource( final EdgeColumnFamilies columnFamilies, final ApplicationScope scope,
+                                               final MarkedEdge markedEdge, final Collection<Shard> shards,
+                                               final DirectedEdgeMeta directedEdgeMeta, final UUID timestamp ) {
+
+        return new SourceWriteOp( columnFamilies, markedEdge ) {
+
+            @Override
+            void writeEdge( final MutationBatch batch,
+                            final MultiTennantColumnFamily<ScopedRowKey<RowKey>, DirectedEdge> columnFamily,
+                            final ApplicationScope scope, final RowKey rowKey, final DirectedEdge edge,
+                            final Shard shard, final boolean isDeleted ) {
+
+                batch.withRow( columnFamily, ScopedRowKey.fromKey( scope.getApplication(), rowKey ) ).deleteColumn( edge );
+                writeEdgeShardStrategy.increment( scope, shard, -1, directedEdgeMeta );
+            }
+        }.createBatch( scope, shards, timestamp );
+    }
+
+
+    @Override
+    public MutationBatch deleteEdgeFromSourceWithTargetType( final EdgeColumnFamilies columnFamilies,
+                                                             final ApplicationScope scope, final MarkedEdge markedEdge,
+                                                             final Collection<Shard> shards,
+                                                             final DirectedEdgeMeta directedEdgeMeta,
+                                                             final UUID timestamp ) {
+        return new SourceTargetTypeWriteOp( columnFamilies, markedEdge ) {
+
+            @Override
+            void writeEdge( final MutationBatch batch,
+                            final MultiTennantColumnFamily<ScopedRowKey<RowKeyType>, DirectedEdge> columnFamily,
+                            final ApplicationScope scope, final RowKeyType rowKey, final DirectedEdge edge,
+                            final Shard shard, final boolean isDeleted ) {
+
+
+                batch.withRow( columnFamilies.getSourceNodeTargetTypeCfName(), ScopedRowKey.fromKey( scope.getApplication(), rowKey ) )
+                     .deleteColumn( edge );
+                writeEdgeShardStrategy.increment( scope, shard, -1, directedEdgeMeta );
+            }
+        }.createBatch( scope, shards, timestamp );
+    }
+
+
+    @Override
+    public MutationBatch deleteEdgeToTarget( final EdgeColumnFamilies columnFamilies, final ApplicationScope scope,
+                                             final MarkedEdge markedEdge, final Collection<Shard> shards,
+                                             final DirectedEdgeMeta directedEdgeMeta, final UUID timestamp ) {
+
+        return new TargetWriteOp( columnFamilies, markedEdge ) {
+
+            @Override
+            void writeEdge( final MutationBatch batch,
+                            final MultiTennantColumnFamily<ScopedRowKey<RowKey>, DirectedEdge> columnFamily,
+                            final ApplicationScope scope, final RowKey rowKey, final DirectedEdge edge,
+                            final Shard shard, final boolean isDeleted ) {
+
+                batch.withRow( columnFamily, ScopedRowKey.fromKey( scope.getApplication(), rowKey ) ).deleteColumn( edge );
+                writeEdgeShardStrategy.increment( scope, shard, -1, directedEdgeMeta );
+            }
+        }.createBatch( scope, shards, timestamp );
+    }
+
+
+    @Override
+    public MutationBatch deleteEdgeToTargetWithSourceType( final EdgeColumnFamilies columnFamilies,
+                                                           final ApplicationScope scope, final MarkedEdge markedEdge,
+                                                           final Collection<Shard> shards,
+                                                           final DirectedEdgeMeta directedEdgeMeta,
+                                                           final UUID timestamp ) {
+
+        return new TargetSourceTypeWriteOp( columnFamilies, markedEdge ) {
+
+            @Override
+            void writeEdge( final MutationBatch batch,
+                            final MultiTennantColumnFamily<ScopedRowKey<RowKeyType>, DirectedEdge> columnFamily,
+                            final ApplicationScope scope, final RowKeyType rowKey, final DirectedEdge edge,
+                            final Shard shard, final boolean isDeleted ) {
+
+                batch.withRow( columnFamilies.getTargetNodeSourceTypeCfName(), ScopedRowKey.fromKey( scope.getApplication(), rowKey ) )
+                     .deleteColumn( edge );
+                writeEdgeShardStrategy.increment( scope, shard, -1, directedEdgeMeta );
+            }
+        }.createBatch( scope, shards, timestamp );
+    }
+
+
+    @Override
+    public MutationBatch deleteEdgeVersions( final EdgeColumnFamilies columnFamilies, final ApplicationScope scope,
+                                             final MarkedEdge markedEdge, final Collection<Shard> shards,
+                                             final DirectedEdgeMeta directedEdgeMeta, final UUID timestamp ) {
+
+        return new EdgeVersions( columnFamilies, markedEdge ) {
+
+            @Override
+            void writeEdge( final MutationBatch batch,
+                            final MultiTennantColumnFamily<ScopedRowKey<EdgeRowKey>, Long> columnFamily,
+                            final ApplicationScope scope, final EdgeRowKey rowKey, final Long column, final Shard shard,
+                            final boolean isDeleted ) {
+                batch.withRow( columnFamilies.getGraphEdgeVersions(), ScopedRowKey.fromKey( scope.getApplication(), rowKey ) )
+                     .deleteColumn( column );
+                writeEdgeShardStrategy.increment( scope, shard, -1, directedEdgeMeta );
+            }
+        }.createBatch( scope, shards, timestamp );
+    }
+
+
+    @Override
+    public Iterator<MarkedEdge> getEdgeVersions( final EdgeColumnFamilies columnFamilies, final ApplicationScope scope,
+                                                 final SearchByEdge search, final Collection<Shard> shards ) {
+        ValidationUtils.validateApplicationScope( scope );
+        GraphValidation.validateSearchByEdge( search );
+
+        final Id targetId = search.targetNode();
+        final Id sourceId = search.sourceNode();
+        final String type = search.getType();
+        final long maxTimestamp = search.getMaxTimestamp();
+        final MultiTennantColumnFamily<ScopedRowKey<EdgeRowKey>, Long> columnFamily =
+                columnFamilies.getGraphEdgeVersions();
+        final Serializer<Long> serializer = columnFamily.getColumnSerializer();
+
+
+        final OrderedComparator<MarkedEdge> comparator = new OrderedComparator<>( DescendingTimestampComparator.INSTANCE, search.getOrder());
+
+
+
+
+        final EdgeSearcher<EdgeRowKey, Long, MarkedEdge> searcher =
+                new EdgeSearcher<EdgeRowKey, Long, MarkedEdge>( scope, shards, search.getOrder(),  comparator, maxTimestamp,
+                        search.last().transform( TRANSFORM ) ) {
+
+
+                    @Override
+                    protected Serializer<Long> getSerializer() {
+                        return serializer;
+                    }
+
+
+                    @Override
+                    public void buildRange( final RangeBuilder builder ) {
+
+
+                        if ( last.isPresent() ) {
+                            super.buildRange( builder );
+                            return;
+                        }
+
+                        //start seeking at a value < our max version
+                        builder.setStart( maxTimestamp );
+                    }
+
+
+                    @Override
+                    protected EdgeRowKey generateRowKey( long shard ) {
+                        return new EdgeRowKey( sourceId, type, targetId, shard );
+                    }
+
+
+                    @Override
+                    protected Long createColumn( final MarkedEdge last ) {
+                        return last.getTimestamp();
+                    }
+
+
+                    @Override
+                    protected MarkedEdge createEdge( final Long column, final boolean marked ) {
+                        return new SimpleMarkedEdge( sourceId, type, targetId, column.longValue(), marked );
+                    }
+
+
+
+                };
+
+        return new ShardsColumnIterator<>( searcher, columnFamily, keyspace, cassandraConfig.getReadCL(),
+                graphFig.getScanPageSize() );
+    }
+
+
+    @Override
+    public Iterator<MarkedEdge> getEdgesFromSource( final EdgeColumnFamilies columnFamilies,
+                                                    final ApplicationScope scope, final SearchByEdgeType search,
+                                                    final Collection<Shard> shards ) {
+
+        ValidationUtils.validateApplicationScope( scope );
+        GraphValidation.validateSearchByEdgeType( search );
+
+        final Id sourceId = search.getNode();
+        final String type = search.getType();
+        final long maxTimestamp = search.getMaxTimestamp();
+        final MultiTennantColumnFamily<ScopedRowKey<RowKey>, DirectedEdge> columnFamily =
+                columnFamilies.getSourceNodeCfName();
+        final Serializer<DirectedEdge> serializer = columnFamily.getColumnSerializer();
+
+
+        final OrderedComparator<MarkedEdge> comparator = new OrderedComparator<>( TargetDirectedEdgeDescendingComparator.INSTANCE, search.getOrder());
+
+
+
+        final EdgeSearcher<RowKey, DirectedEdge, MarkedEdge> searcher =
+                new EdgeSearcher<RowKey, DirectedEdge, MarkedEdge>( scope, shards, search.getOrder(), comparator, maxTimestamp,
+                        search.last().transform( TRANSFORM ) ) {
+
+
+                    @Override
+                    protected Serializer<DirectedEdge> getSerializer() {
+                        return serializer;
+                    }
+
+
+                    @Override
+                    protected RowKey generateRowKey( long shard ) {
+                        return new RowKey( sourceId, type, shard );
+                    }
+
+
+                    @Override
+                    protected DirectedEdge createColumn( final MarkedEdge last ) {
+                        return new DirectedEdge( last.getTargetNode(), last.getTimestamp() );
+                    }
+
+
+                    @Override
+                    protected MarkedEdge createEdge( final DirectedEdge edge, final boolean marked ) {
+                        return new SimpleMarkedEdge( sourceId, type, edge.id, edge.timestamp, marked );
+                    }
+                };
+
+
+        return new ShardsColumnIterator<>( searcher, columnFamily, keyspace, cassandraConfig.getReadCL(),
+                graphFig.getScanPageSize() );
+    }
+
+
+    @Override
+    public Iterator<MarkedEdge> getEdgesFromSourceByTargetType( final EdgeColumnFamilies columnFamilies,
+                                                                final ApplicationScope scope,
+                                                                final SearchByIdType search,
+                                                                final Collection<Shard> shards ) {
+
+        ValidationUtils.validateApplicationScope( scope );
+        GraphValidation.validateSearchByEdgeType( search );
+
+        final Id targetId = search.getNode();
+        final String type = search.getType();
+        final String targetType = search.getIdType();
+        final long maxTimestamp = search.getMaxTimestamp();
+        final MultiTennantColumnFamily<ScopedRowKey<RowKeyType>, DirectedEdge> columnFamily =
+                columnFamilies.getSourceNodeTargetTypeCfName();
+        final Serializer<DirectedEdge> serializer = columnFamily.getColumnSerializer();
+
+        final OrderedComparator<MarkedEdge> comparator = new OrderedComparator<>( TargetDirectedEdgeDescendingComparator.INSTANCE, search.getOrder());
+
+
+        final EdgeSearcher<RowKeyType, DirectedEdge, MarkedEdge> searcher =
+                new EdgeSearcher<RowKeyType, DirectedEdge, MarkedEdge>( scope, shards, search.getOrder(), comparator, maxTimestamp,
+                        search.last().transform( TRANSFORM ) ) {
+
+                    @Override
+                    protected Serializer<DirectedEdge> getSerializer() {
+                        return serializer;
+                    }
+
+
+                    @Override
+                    protected RowKeyType generateRowKey( long shard ) {
+                        return new RowKeyType( targetId, type, targetType, shard );
+                    }
+
+
+                    @Override
+                    protected DirectedEdge createColumn( final MarkedEdge last ) {
+                        return new DirectedEdge( last.getTargetNode(), last.getTimestamp() );
+                    }
+
+
+                    @Override
+                    protected MarkedEdge createEdge( final DirectedEdge edge, final boolean marked ) {
+                        return new SimpleMarkedEdge( targetId, type, edge.id, edge.timestamp, marked );
+                    }
+                };
+
+        return new ShardsColumnIterator( searcher, columnFamily, keyspace, cassandraConfig.getReadCL(),
+                graphFig.getScanPageSize() );
+    }
+
+
+    @Override
+    public Iterator<MarkedEdge> getEdgesToTarget( final EdgeColumnFamilies columnFamilies, final ApplicationScope scope,
+                                                  final SearchByEdgeType search, final Collection<Shard> shards ) {
+        ValidationUtils.validateApplicationScope( scope );
+        GraphValidation.validateSearchByEdgeType( search );
+
+        final Id targetId = search.getNode();
+        final String type = search.getType();
+        final long maxTimestamp = search.getMaxTimestamp();
+        final MultiTennantColumnFamily<ScopedRowKey<RowKey>, DirectedEdge> columnFamily =
+                columnFamilies.getTargetNodeCfName();
+        final Serializer<DirectedEdge> serializer = columnFamily.getColumnSerializer();
+
+        final OrderedComparator<MarkedEdge> comparator = new OrderedComparator<>( SourceDirectedEdgeDescendingComparator.INSTANCE, search.getOrder());
+
+        final EdgeSearcher<RowKey, DirectedEdge, MarkedEdge> searcher =
+                new EdgeSearcher<RowKey, DirectedEdge, MarkedEdge>( scope, shards, search.getOrder(),comparator,  maxTimestamp,
+                        search.last().transform( TRANSFORM ) ) {
+
+                    @Override
+                    protected Serializer<DirectedEdge> getSerializer() {
+                        return serializer;
+                    }
+
+
+                    @Override
+                    protected RowKey generateRowKey( long shard ) {
+                        return new RowKey( targetId, type, shard );
+                    }
+
+
+                    @Override
+                    protected DirectedEdge createColumn( final MarkedEdge last ) {
+                        return new DirectedEdge( last.getSourceNode(), last.getTimestamp() );
+                    }
+
+
+                    @Override
+                    protected MarkedEdge createEdge( final DirectedEdge edge, final boolean marked ) {
+                        return new SimpleMarkedEdge( edge.id, type, targetId, edge.timestamp, marked );
+                    }
+                };
+
+
+        return new ShardsColumnIterator<>( searcher, columnFamily, keyspace, cassandraConfig.getReadCL(),
+                graphFig.getScanPageSize() );
+    }
+
+
+    @Override
+    public Iterator<MarkedEdge> getEdgesToTargetBySourceType( final EdgeColumnFamilies columnFamilies,
+                                                              final ApplicationScope scope,
+                                                              final SearchByIdType search,
+                                                              final Collection<Shard> shards ) {
+
+        ValidationUtils.validateApplicationScope( scope );
+        GraphValidation.validateSearchByEdgeType( search );
+
+        final Id targetId = search.getNode();
+        final String sourceType = search.getIdType();
+        final String type = search.getType();
+        final long maxTimestamp = search.getMaxTimestamp();
+        final MultiTennantColumnFamily<ScopedRowKey<RowKeyType>, DirectedEdge> columnFamily =
+                columnFamilies.getTargetNodeSourceTypeCfName();
+        final Serializer<DirectedEdge> serializer = columnFamily.getColumnSerializer();
+
+        final OrderedComparator<MarkedEdge> comparator = new OrderedComparator<>( SourceDirectedEdgeDescendingComparator.INSTANCE, search.getOrder());
+
+
+        final EdgeSearcher<RowKeyType, DirectedEdge, MarkedEdge> searcher =
+                new EdgeSearcher<RowKeyType, DirectedEdge, MarkedEdge>( scope, shards, search.getOrder(), comparator, maxTimestamp,
+                        search.last().transform( TRANSFORM ) ) {
+                    @Override
+                    protected Serializer<DirectedEdge> getSerializer() {
+                        return serializer;
+                    }
+
+
+                    @Override
+                    protected RowKeyType generateRowKey( final long shard ) {
+                        return new RowKeyType( targetId, type, sourceType, shard );
+                    }
+
+
+                    @Override
+                    protected DirectedEdge createColumn( final MarkedEdge last ) {
+                        return new DirectedEdge( last.getTargetNode(), last.getTimestamp() );
+                    }
+
+
+                    @Override
+                    protected MarkedEdge createEdge( final DirectedEdge edge, final boolean marked ) {
+                        return new SimpleMarkedEdge( edge.id, type, targetId, edge.timestamp, marked );
+                    }
+                };
+
+        return new ShardsColumnIterator<>( searcher, columnFamily, keyspace, cassandraConfig.getReadCL(),
+                graphFig.getScanPageSize() );
+    }
+
+
+
+
+
+    /**
+     * Simple callback to perform puts and deletes with a common row setup code
+     *
+     * @param <R> The row key type
+     * @param <C> The column type
+     */
+    private abstract class RowOp<R, C> {
+
+
+        /**
+         * Return the column family used for the write
+         */
+        protected abstract MultiTennantColumnFamily<ScopedRowKey<R>, C> getColumnFamily();
+
+        /**
+         * Get the row key
+         */
+        public abstract R getRowKey( final Shard shard );
+
+        /**
+         * Get the column family value
+         */
+        protected abstract C getDirectedEdge();
+
+        /**
+         * Get the flag on if it's deleted
+         */
+        protected abstract boolean isDeleted();
+
+
+        /**
+         * Write the edge with the given data
+         */
+        abstract void writeEdge( final MutationBatch batch,
+                                 final MultiTennantColumnFamily<ScopedRowKey<R>, C> columnFamily,
+                                 final ApplicationScope scope, final R rowKey, final C column, final Shard shard,
+                                 final boolean isDeleted );
+
+
+        /**
+         * Create a mutation batch
+         */
+        public MutationBatch createBatch( final ApplicationScope scope, final Collection<Shard> shards,
+                                          final UUID opTimestamp ) {
+
+            final MutationBatch batch =
+                    keyspace.prepareMutationBatch().withConsistencyLevel( cassandraConfig.getWriteCL() )
+                            .withTimestamp( opTimestamp.timestamp() );
+
+
+            final C column = getDirectedEdge();
+            final MultiTennantColumnFamily<ScopedRowKey<R>, C> columnFamily = getColumnFamily();
+            final boolean isDeleted = isDeleted();
+
+
+            for ( Shard shard : shards ) {
+                final R rowKey = getRowKey( shard );
+                writeEdge( batch, columnFamily, scope, rowKey, column, shard, isDeleted );
+            }
+
+
+            return batch;
+        }
+    }
+
+
+    /**
+     * Perform a write of the source->target
+     */
+    private abstract class SourceWriteOp extends RowOp<RowKey, DirectedEdge> {
+
+        private final MultiTennantColumnFamily<ScopedRowKey<RowKey>, DirectedEdge> columnFamily;
+        private final Id sourceNodeId;
+
+
+        private final String type;
+        private final boolean isDeleted;
+        private final DirectedEdge directedEdge;
+
+
+        /**
+         * Write the source write operation
+         */
+        private SourceWriteOp( final EdgeColumnFamilies edgeColumnFamilies, final MarkedEdge markedEdge ) {
+            this.columnFamily = edgeColumnFamilies.getSourceNodeCfName();
+
+            this.sourceNodeId = markedEdge.getSourceNode();
+
+            this.type = markedEdge.getType();
+            this.isDeleted = markedEdge.isDeleted();
+
+            this.directedEdge = new DirectedEdge( markedEdge.getTargetNode(), markedEdge.getTimestamp() );
+        }
+
+
+        @Override
+        protected MultiTennantColumnFamily<ScopedRowKey<RowKey>, DirectedEdge> getColumnFamily() {
+            return columnFamily;
+        }
+
+
+        @Override
+        public RowKey getRowKey( final Shard shard ) {
+            return new RowKey( sourceNodeId, type, shard.getShardIndex() );
+        }
+
+
+        @Override
+        protected DirectedEdge getDirectedEdge() {
+            return directedEdge;
+        }
+
+
+        @Override
+        protected boolean isDeleted() {
+            return isDeleted;
+        }
+    }
+
+
+    /**
+     * Perform a write of the source->target with target type
+     */
+    private abstract class SourceTargetTypeWriteOp extends RowOp<RowKeyType, DirectedEdge> {
+
+        private final MultiTennantColumnFamily<ScopedRowKey<RowKeyType>, DirectedEdge> columnFamily;
+        private final Id sourceNodeId;
+        private final String type;
+        private Id targetId;
+        private final boolean isDeleted;
+        private final DirectedEdge directedEdge;
+
+
+        /**
+         * Write the source write operation
+         */
+        private SourceTargetTypeWriteOp( final EdgeColumnFamilies edgeColumnFamilies, final MarkedEdge markedEdge ) {
+            this.columnFamily = edgeColumnFamilies.getSourceNodeTargetTypeCfName();
+
+            this.sourceNodeId = markedEdge.getSourceNode();
+
+            this.type = markedEdge.getType();
+            this.targetId = markedEdge.getTargetNode();
+            this.isDeleted = markedEdge.isDeleted();
+
+            this.directedEdge = new DirectedEdge( targetId, markedEdge.getTimestamp() );
+        }
+
+
+        @Override
+        protected MultiTennantColumnFamily<ScopedRowKey<RowKeyType>, DirectedEdge> getColumnFamily() {
+            return columnFamily;
+        }
+
+
+        @Override
+        public RowKeyType getRowKey( final Shard shard ) {
+            return new RowKeyType( sourceNodeId, type, targetId, shard.getShardIndex() );
+        }
+
+
+        @Override
+        protected DirectedEdge getDirectedEdge() {
+            return directedEdge;
+        }
+
+
+        @Override
+        protected boolean isDeleted() {
+            return isDeleted;
+        }
+    }
+
+
+    /**
+     * Perform a write of the target <-- source
+     */
+    private abstract class TargetWriteOp extends RowOp<RowKey, DirectedEdge> {
+
+        private final MultiTennantColumnFamily<ScopedRowKey<RowKey>, DirectedEdge> columnFamily;
+        private final Id targetNode;
+
+
+        private final String type;
+        private final boolean isDeleted;
+        private final DirectedEdge directedEdge;
+
+
+        /**
+         * Write the source write operation
+         */
+        private TargetWriteOp( final EdgeColumnFamilies edgeColumnFamilies, final MarkedEdge markedEdge ) {
+            this.columnFamily = edgeColumnFamilies.getTargetNodeCfName();
+
+            this.targetNode = markedEdge.getTargetNode();
+
+            this.type = markedEdge.getType();
+            this.isDeleted = markedEdge.isDeleted();
+
+            this.directedEdge = new DirectedEdge( markedEdge.getSourceNode(), markedEdge.getTimestamp() );
+        }
+
+
+        @Override
+        protected MultiTennantColumnFamily<ScopedRowKey<RowKey>, DirectedEdge> getColumnFamily() {
+            return columnFamily;
+        }
+
+
+        @Override
+        public RowKey getRowKey( final Shard shard ) {
+            return new RowKey( targetNode, type, shard.getShardIndex() );
+        }
+
+
+        @Override
+        protected DirectedEdge getDirectedEdge() {
+            return directedEdge;
+        }
+
+
+        @Override
+        protected boolean isDeleted() {
+            return isDeleted;
+        }
+    }
+
+
+    /**
+     * Perform a write of the target<--source with source type
+     */
+    private abstract class TargetSourceTypeWriteOp extends RowOp<RowKeyType, DirectedEdge> {
+
+        private final MultiTennantColumnFamily<ScopedRowKey<RowKeyType>, DirectedEdge> columnFamily;
+        private final Id targetNode;
+
+        private final Id sourceNode;
+
+        final String type;
+
+        final boolean isDeleted;
+        final DirectedEdge directedEdge;
+
+
+        /**
+         * Write the source write operation
+         */
+        private TargetSourceTypeWriteOp( final EdgeColumnFamilies edgeColumnFamilies, final MarkedEdge markedEdge ) {
+            this.columnFamily = edgeColumnFamilies.getSourceNodeTargetTypeCfName();
+
+            this.targetNode = markedEdge.getTargetNode();
+            this.sourceNode = markedEdge.getSourceNode();
+
+            this.type = markedEdge.getType();
+            this.isDeleted = markedEdge.isDeleted();
+
+            this.directedEdge = new DirectedEdge( sourceNode, markedEdge.getTimestamp() );
+        }
+
+
+        @Override
+        protected MultiTennantColumnFamily<ScopedRowKey<RowKeyType>, DirectedEdge> getColumnFamily() {
+            return columnFamily;
+        }
+
+
+        @Override
+        public RowKeyType getRowKey( final Shard shard ) {
+            return new RowKeyType( targetNode, type, sourceNode, shard.getShardIndex() );
+        }
+
+
+        @Override
+        protected DirectedEdge getDirectedEdge() {
+            return directedEdge;
+        }
+
+
+        @Override
+        protected boolean isDeleted() {
+            return isDeleted;
+        }
+    }
+
+
+    /**
+     * Perform a write of the edge versions
+     */
+    private abstract class EdgeVersions extends RowOp<EdgeRowKey, Long> {
+
+        private final MultiTennantColumnFamily<ScopedRowKey<EdgeRowKey>, Long> columnFamily;
+        private final Id targetNode;
+
+        private final Id sourceNode;
+
+        final String type;
+
+        final boolean isDeleted;
+        final Long edgeVersion;
+
+
+        /**
+         * Write the source write operation
+         */
+        private EdgeVersions( final EdgeColumnFamilies edgeColumnFamilies, final MarkedEdge markedEdge ) {
+            this.columnFamily = edgeColumnFamilies.getGraphEdgeVersions();
+
+            this.targetNode = markedEdge.getTargetNode();
+            this.sourceNode = markedEdge.getSourceNode();
+
+            this.type = markedEdge.getType();
+            this.isDeleted = markedEdge.isDeleted();
+
+            this.edgeVersion = markedEdge.getTimestamp();
+        }
+
+
+        @Override
+        protected MultiTennantColumnFamily<ScopedRowKey<EdgeRowKey>, Long> getColumnFamily() {
+            return columnFamily;
+        }
+
+
+        @Override
+        public EdgeRowKey getRowKey( final Shard shard ) {
+            return new EdgeRowKey( sourceNode, type, targetNode, shard.getShardIndex() );
+        }
+
+
+        @Override
+        protected Long getDirectedEdge() {
+            return edgeVersion;
+        }
+
+
+        @Override
+        protected boolean isDeleted() {
+            return isDeleted;
+        }
+    }
+
+
+
+
+
+
+    private static final Function<Edge, MarkedEdge> TRANSFORM = new Function<Edge, MarkedEdge>() {
+        @Nullable
+        @Override
+        public MarkedEdge apply( @Nullable final Edge input ) {
+
+            if ( input == null ) {
+                return null;
+            }
+
+            if ( input instanceof MarkedEdge ) {
+                return ( MarkedEdge ) input;
+            }
+
+            return new SimpleMarkedEdge( input.getSourceNode(), input.getType(), input.getTargetNode(),
+                    input.getTimestamp(), false );
+        }
+    };
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9f3bf2b3/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
index f107307..8ccf809 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
@@ -32,14 +32,15 @@ import org.apache.usergrid.persistence.core.astyanax.ColumnParser;
 import org.apache.usergrid.persistence.core.astyanax.ColumnTypes;
 import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
 import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
-import org.apache.usergrid.persistence.core.astyanax.ScopedRowKeySerializer;
 import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
+import org.apache.usergrid.persistence.core.astyanax.ScopedRowKeySerializer;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.util.ValidationUtils;
 import org.apache.usergrid.persistence.graph.GraphFig;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardSerialization;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardConsistency;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.serialize.EdgeShardRowKeySerializer;
 import org.apache.usergrid.persistence.graph.serialization.util.GraphValidation;
 
@@ -50,6 +51,7 @@ import com.google.inject.Singleton;
 import com.netflix.astyanax.Keyspace;
 import com.netflix.astyanax.MutationBatch;
 import com.netflix.astyanax.model.Column;
+import com.netflix.astyanax.model.ConsistencyLevel;
 import com.netflix.astyanax.query.RowQuery;
 import com.netflix.astyanax.serializers.LongSerializer;
 import com.netflix.astyanax.util.RangeBuilder;
@@ -62,8 +64,8 @@ public class EdgeShardSerializationImpl implements EdgeShardSerialization {
      * Edge shards
      */
     private static final MultiTennantColumnFamily<ScopedRowKey<DirectedEdgeMeta>, Long> EDGE_SHARDS =
-            new MultiTennantColumnFamily<>( "Edge_Shards",
-                    new ScopedRowKeySerializer<>( EdgeShardRowKeySerializer.INSTANCE ), LongSerializer.get() );
+        new MultiTennantColumnFamily<>( "Edge_Shards",
+            new ScopedRowKeySerializer<>( EdgeShardRowKeySerializer.INSTANCE ), LongSerializer.get() );
 
 
     private static final ShardColumnParser COLUMN_PARSER = new ShardColumnParser();
@@ -72,20 +74,22 @@ public class EdgeShardSerializationImpl implements EdgeShardSerialization {
     protected final Keyspace keyspace;
     protected final CassandraConfig cassandraConfig;
     protected final GraphFig graphFig;
+    protected final ShardConsistency shardConsistency;
 
 
     @Inject
     public EdgeShardSerializationImpl( final Keyspace keyspace, final CassandraConfig cassandraConfig,
-                                       final GraphFig graphFig ) {
+                                       final GraphFig graphFig, final ShardConsistency shardConsistency ) {
         this.keyspace = keyspace;
         this.cassandraConfig = cassandraConfig;
         this.graphFig = graphFig;
+        this.shardConsistency = shardConsistency;
     }
 
 
     @Override
-    public MutationBatch writeShardMeta( final ApplicationScope scope,
-                                         final Shard shard,   final DirectedEdgeMeta metaData) {
+    public MutationBatch writeShardMeta( final ApplicationScope scope, final Shard shard,
+                                         final DirectedEdgeMeta metaData ) {
 
         ValidationUtils.validateApplicationScope( scope );
         GraphValidation.validateDirectedEdgeMeta( metaData );
@@ -98,7 +102,8 @@ public class EdgeShardSerializationImpl implements EdgeShardSerialization {
 
         final ScopedRowKey rowKey = ScopedRowKey.fromKey( scope.getApplication(), metaData );
 
-        final MutationBatch batch = keyspace.prepareMutationBatch();
+        final MutationBatch batch =
+            keyspace.prepareMutationBatch().withConsistencyLevel( shardConsistency.getShardWriteConsistency() );
 
         batch.withTimestamp( shard.getCreatedTime() ).withRow( EDGE_SHARDS, rowKey )
              .putColumn( shard.getShardIndex(), shard.isCompacted() );
@@ -108,8 +113,31 @@ public class EdgeShardSerializationImpl implements EdgeShardSerialization {
 
 
     @Override
-    public Iterator<Shard> getShardMetaData( final ApplicationScope scope,
-                                             final Optional<Shard> start,   final DirectedEdgeMeta metaData  ) {
+    public Iterator<Shard> getShardMetaDataLocal( final ApplicationScope scope, final Optional<Shard> start,
+                                                  final DirectedEdgeMeta metaData ) {
+
+        return getShardMetaDataInternal( scope, start, metaData, shardConsistency.getShardReadConsistency() );
+    }
+
+
+    @Override
+    public Iterator<Shard> getShardMetaDataAudit( final ApplicationScope scope, final Optional<Shard> start,
+                                                  final DirectedEdgeMeta directedEdgeMeta ) {
+        return getShardMetaDataInternal( scope, start, directedEdgeMeta, shardConsistency.getShardAuditConsistency() );
+    }
+
+
+    /**
+     * Get the shard meta data, allowing the caller to specify the consistency level
+     * @param scope
+     * @param start
+     * @param metaData
+     * @param consistencyLevel
+     * @return
+     */
+    private Iterator<Shard> getShardMetaDataInternal( final ApplicationScope scope, final Optional<Shard> start,
+                                                      final DirectedEdgeMeta metaData,
+                                                      final ConsistencyLevel consistencyLevel ) {
 
         ValidationUtils.validateApplicationScope( scope );
         GraphValidation.validateDirectedEdgeMeta( metaData );
@@ -134,8 +162,8 @@ public class EdgeShardSerializationImpl implements EdgeShardSerialization {
 
 
         final RowQuery<ScopedRowKey<DirectedEdgeMeta>, Long> query =
-                keyspace.prepareQuery( EDGE_SHARDS ).setConsistencyLevel( cassandraConfig.getReadCL() ).getKey( rowKey )
-                        .autoPaginate( true ).withColumnRange( rangeBuilder.build() );
+            keyspace.prepareQuery( EDGE_SHARDS ).setConsistencyLevel( consistencyLevel ).getKey( rowKey )
+                    .autoPaginate( true ).withColumnRange( rangeBuilder.build() );
 
 
         return new ColumnNameIterator<>( query, COLUMN_PARSER, false );
@@ -143,18 +171,17 @@ public class EdgeShardSerializationImpl implements EdgeShardSerialization {
 
 
     @Override
-    public MutationBatch removeShardMeta( final ApplicationScope scope,
-                                          final Shard shard,   final DirectedEdgeMeta metaData) {
-
+    public MutationBatch removeShardMeta( final ApplicationScope scope, final Shard shard,
+                                          final DirectedEdgeMeta metaData ) {
         ValidationUtils.validateApplicationScope( scope );
         GraphValidation.valiateShard( shard );
         GraphValidation.validateDirectedEdgeMeta( metaData );
 
 
-
         final ScopedRowKey rowKey = ScopedRowKey.fromKey( scope.getApplication(), metaData );
 
-        final MutationBatch batch = keyspace.prepareMutationBatch();
+        final MutationBatch batch =
+            keyspace.prepareMutationBatch().withConsistencyLevel( shardConsistency.getShardWriteConsistency( ));
 
         batch.withRow( EDGE_SHARDS, rowKey ).deleteColumn( shard.getShardIndex() );
 
@@ -164,19 +191,13 @@ public class EdgeShardSerializationImpl implements EdgeShardSerialization {
 
     @Override
     public Collection<MultiTennantColumnFamilyDefinition> getColumnFamilies() {
-
-
         return Collections.singleton(
-                new MultiTennantColumnFamilyDefinition( EDGE_SHARDS, BytesType.class.getSimpleName(),
-                        ColumnTypes.LONG_TYPE_REVERSED, BytesType.class.getSimpleName(),
-                        MultiTennantColumnFamilyDefinition.CacheOption.KEYS ) );
+            new MultiTennantColumnFamilyDefinition( EDGE_SHARDS, BytesType.class.getSimpleName(),
+                ColumnTypes.LONG_TYPE_REVERSED, BytesType.class.getSimpleName(),
+                MultiTennantColumnFamilyDefinition.CacheOption.KEYS ) );
     }
 
 
-
-
-
-
     private static class ShardColumnParser implements ColumnParser<Long, Shard> {
 
         @Override

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9f3bf2b3/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
index 80c6a5f..592d308 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
@@ -32,7 +32,6 @@ import org.apache.usergrid.persistence.core.util.ValidationUtils;
 import org.apache.usergrid.persistence.graph.GraphFig;
 import org.apache.usergrid.persistence.graph.MarkedEdge;
 import org.apache.usergrid.persistence.graph.SearchByEdgeType;
-import org.apache.usergrid.persistence.graph.exception.GraphRuntimeException;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeColumnFamilies;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardSerialization;
@@ -46,6 +45,8 @@ import org.apache.usergrid.persistence.graph.serialization.util.GraphValidation;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.inject.Inject;
 import com.netflix.astyanax.MutationBatch;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
@@ -62,6 +63,8 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
 
     private static final Shard MIN_SHARD = new Shard( 0, 0, true );
 
+    private static final NoOpCompaction NO_OP_COMPACTION = new NoOpCompaction();
+
     private final EdgeShardSerialization edgeShardSerialization;
     private final EdgeColumnFamilies edgeColumnFamilies;
     private final ShardedEdgeSerialization shardedEdgeSerialization;
@@ -103,7 +106,7 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
         }
 
         else {
-            existingShards = edgeShardSerialization.getShardMetaData( scope, maxShardId, directedEdgeMeta );
+            existingShards = edgeShardSerialization.getShardMetaDataLocal( scope, maxShardId, directedEdgeMeta );
         }
 
         if ( existingShards == null || !existingShards.hasNext() ) {
@@ -120,8 +123,7 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
             existingShards = Collections.singleton( MIN_SHARD ).iterator();
         }
 
-        return new ShardEntryGroupIterator( existingShards, shardGroupCompaction, scope,
-                directedEdgeMeta );
+        return new ShardEntryGroupIterator( existingShards, shardGroupCompaction, scope, directedEdgeMeta );
     }
 
 
@@ -135,6 +137,10 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
 
         Preconditions.checkNotNull( shardEntryGroup, "shardEntryGroup cannot be null" );
 
+        //we have to read our state from cassandra first to ensure we have an up to date view from other regions
+
+
+
 
         /**
          * Nothing to do, it's been created very recently, we don't create a new one
@@ -155,11 +161,6 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
         final Shard shard = shardEntryGroup.getMinShard();
 
 
-        if ( shard.getCreatedTime() >= getMinTime() ) {
-            return false;
-        }
-
-
         /**
          * Check out if we have a count for our shard allocation
          */
@@ -173,8 +174,8 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
             return false;
         }
 
-        if(LOG.isDebugEnabled()){
-            LOG.debug("Count of {} has exceeded shard config of {} will begin compacting", count, shardSize);
+        if ( LOG.isDebugEnabled() ) {
+            LOG.debug( "Count of {} has exceeded shard config of {} will begin compacting", count, shardSize );
         }
 
         /**
@@ -193,13 +194,13 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
          */
 
         final Iterator<MarkedEdge> edges = directedEdgeMeta
-                .loadEdges( shardedEdgeSerialization, edgeColumnFamilies, scope, shardEntryGroup.getReadShards(), 0,
-                        SearchByEdgeType.Order.ASCENDING );
+            .loadEdges( shardedEdgeSerialization, edgeColumnFamilies, scope, shardEntryGroup.getReadShards(), 0,
+                SearchByEdgeType.Order.ASCENDING );
 
 
         if ( !edges.hasNext() ) {
-            LOG.warn( "Tried to allocate a new shard for edge meta data {}, "
-                    + "but no max value could be found in that row", directedEdgeMeta );
+            LOG.warn( "Tried to allocate a new shard for edge meta data {}, " + "but no max value could be found in that row",
+                directedEdgeMeta );
             return false;
         }
 
@@ -214,12 +215,12 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
          */
 
 
-        for(long i = 1;  edges.hasNext(); i++){
+        for ( long i = 1; edges.hasNext(); i++ ) {
             //we hit a pivot shard, set it since it could be the last one we encounter
-            if(i% shardSize == 0){
+            if ( i % shardSize == 0 ) {
                 marked = edges.next();
             }
-            else{
+            else {
                 edges.next();
             }
         }
@@ -228,8 +229,8 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
         /**
          * Sanity check in case our counters become severely out of sync with our edge state in cassandra.
          */
-        if(marked == null){
-            LOG.warn( "Incorrect shard count for shard group {}", shardEntryGroup );
+        if ( marked == null ) {
+            LOG.warn( "Incorrect shard count for shard group {}, ignoring", shardEntryGroup );
             return false;
         }
 
@@ -248,59 +249,61 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
             throw new RuntimeException( "Unable to connect to casandra", e );
         }
 
-
         return true;
     }
 
+    /**
+        * Return true if the node has been created within our timeout.  If this is the case, we dont' need to check
+        * cassandra, we know it won't exist
+        */
+       private boolean isNewNode( DirectedEdgeMeta directedEdgeMeta ) {
 
-    @Override
-    public long getMinTime() {
 
-        final long minimumAllowed = 2 * graphFig.getShardCacheTimeout();
+           //TODO: TN this is broken....
+           //The timeout is in milliseconds.  Time for a time uuid is 1/10000 of a milli, so we need to get the units correct
+           final long timeNow = timeService.getCurrentTime();
 
-        final long minDelta = graphFig.getShardMinDelta();
+           boolean isNew = true;
 
+           for ( DirectedEdgeMeta.NodeMeta node : directedEdgeMeta.getNodes() ) {
 
-        if ( minDelta < minimumAllowed ) {
-            throw new GraphRuntimeException( String.format(
-                    "You must configure the property %s to be >= 2 x %s.  Otherwise you risk losing data",
-                    GraphFig.SHARD_MIN_DELTA, GraphFig.SHARD_CACHE_TIMEOUT ) );
-        }
+               //short circuit
+               if(!isNew || node.getId().getUuid().version() > 2){
+                   return false;
+               }
 
-        return timeService.getCurrentTime() - minDelta;
-    }
+               final long uuidTime =   TimeUUIDUtils.getTimeFromUUID( node.getId().getUuid());
 
+               //take our uuid time and add 10 seconds, if the uuid is within 10 seconds of system time, we can consider it "new"
+               final long newExpirationTimeout = uuidTime + 10000 ;
 
-    /**
-     * Return true if the node has been created within our timeout.  If this is the case, we dont' need to check
-     * cassandra, we know it won't exist
-     */
-    private boolean isNewNode( DirectedEdgeMeta directedEdgeMeta ) {
+               //our expiration is after our current time, treat it as new
+               isNew = isNew && newExpirationTimeout >  timeNow;
+           }
 
+           return isNew;
+       }
 
-        //TODO: TN this is broken....
-        //The timeout is in milliseconds.  Time for a time uuid is 1/10000 of a milli, so we need to get the units correct
-        final long timeoutDelta = graphFig.getShardCacheTimeout() ;
+    private ShardEntryGroupIterator getCurrentStateIterator(final ApplicationScope scope, final ShardEntryGroup shardEntryGroup,
+                                   final DirectedEdgeMeta directedEdgeMeta ){
 
-        final long timeNow = timeService.getCurrentTime();
+        final Shard start = shardEntryGroup.getMaxShard();
 
-        boolean isNew = true;
+        final Iterator<Shard> shards = this.edgeShardSerialization.getShardMetaDataAudit( scope, Optional.fromNullable( start ), directedEdgeMeta );
 
-        for ( DirectedEdgeMeta.NodeMeta node : directedEdgeMeta.getNodes() ) {
+        return new ShardEntryGroupIterator( shards, NO_OP_COMPACTION, scope, directedEdgeMeta );
+    }
 
-            //short circuit
-            if(!isNew || node.getId().getUuid().version() > 2){
-                return false;
-            }
 
-            final long uuidTime =   TimeUUIDUtils.getTimeFromUUID( node.getId().getUuid());
+    private final static class NoOpCompaction implements ShardGroupCompaction{
 
-            final long newExpirationTimeout = uuidTime + timeoutDelta;
+        @Override
+        public ListenableFuture<AuditResult> evaluateShardGroup( final ApplicationScope scope,
+                                                                 final DirectedEdgeMeta edgeMeta,
+                                                                 final ShardEntryGroup group ) {
 
-            //our expiration is after our current time, treat it as new
-            isNew = isNew && newExpirationTimeout >  timeNow;
+            //deliberately a no op
+            return Futures.immediateFuture( AuditResult.NOT_CHECKED );
         }
-
-        return isNew;
     }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9f3bf2b3/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardConsistencyImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardConsistencyImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardConsistencyImpl.java
new file mode 100644
index 0000000..c76fae6
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardConsistencyImpl.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
+
+
+import org.apache.usergrid.persistence.graph.GraphFig;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardConsistency;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.netflix.astyanax.model.ConsistencyLevel;
+
+
+/**
+ * Implementation wrapper for enums
+ */
+@Singleton
+public class ShardConsistencyImpl implements ShardConsistency{
+
+    private final GraphFig graphFig;
+
+
+    @Inject
+    public ShardConsistencyImpl( final GraphFig graphFig ) {this.graphFig = graphFig;}
+
+
+    @Override
+    public ConsistencyLevel getShardWriteConsistency() {
+        return ConsistencyLevel.valueOf( graphFig.getShardWriteConsistency() );
+    }
+
+
+    @Override
+    public ConsistencyLevel getShardReadConsistency() {
+        return null;
+    }
+
+
+    @Override
+    public ConsistencyLevel getShardAuditConsistency() {
+        return null;
+    }
+}


[2/3] usergrid git commit: Updated tests and finished basic check + allocate algorithm

Posted by to...@apache.org.
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9f3bf2b3/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
deleted file mode 100644
index 13f7427..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
+++ /dev/null
@@ -1,1006 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one
- *  * or more contributor license agreements.  See the NOTICE file
- *  * distributed with this work for additional information
- *  * regarding copyright ownership.  The ASF licenses this file
- *  * to you under the Apache License, Version 2.0 (the
- *  * "License"); you may not use this file except in compliance
- *  * with the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing,
- *  * software distributed under the License is distributed on an
- *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  * KIND, either express or implied.  See the License for the
- *  * specific language governing permissions and limitations
- *  * under the License.
- *
- */
-package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
-
-
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.UUID;
-
-import javax.annotation.Nullable;
-import javax.inject.Inject;
-
-import org.apache.usergrid.persistence.core.astyanax.CassandraConfig;
-import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
-import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
-import org.apache.usergrid.persistence.core.consistency.TimeService;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.util.ValidationUtils;
-import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.graph.GraphFig;
-import org.apache.usergrid.persistence.graph.MarkedEdge;
-import org.apache.usergrid.persistence.graph.SearchByEdge;
-import org.apache.usergrid.persistence.graph.SearchByEdgeType;
-import org.apache.usergrid.persistence.graph.SearchByIdType;
-import org.apache.usergrid.persistence.graph.impl.SimpleMarkedEdge;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdge;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeColumnFamilies;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeRowKey;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardStrategy;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.RowKey;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.RowKeyType;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardedEdgeSerialization;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.comparators.DescendingTimestampComparator;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.comparators.OrderedComparator;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.comparators
-        .SourceDirectedEdgeDescendingComparator;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.comparators
-        .TargetDirectedEdgeDescendingComparator;
-import org.apache.usergrid.persistence.graph.serialization.util.GraphValidation;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.inject.Singleton;
-import com.netflix.astyanax.Keyspace;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.Serializer;
-import com.netflix.astyanax.util.RangeBuilder;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-
-/**
- * TODO: Rafactor this to use shards only, no shard groups, just collections of shards.  The parent caller can aggregate
- * the results of multiple groups together, this has an impedance mismatch in the API layer.
- */
-@Singleton
-public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
-
-    protected final Keyspace keyspace;
-    protected final CassandraConfig cassandraConfig;
-    protected final GraphFig graphFig;
-    protected final EdgeShardStrategy writeEdgeShardStrategy;
-    protected final TimeService timeService;
-
-
-    @Inject
-    public ShardedEdgeSerializationImpl( final Keyspace keyspace, final CassandraConfig cassandraConfig,
-                                         final GraphFig graphFig, final EdgeShardStrategy writeEdgeShardStrategy,
-                                         final TimeService timeService ) {
-
-
-        checkNotNull( "keyspace required", keyspace );
-        checkNotNull( "cassandraConfig required", cassandraConfig );
-        checkNotNull( "consistencyFig required", graphFig );
-        checkNotNull( "writeEdgeShardStrategy required", writeEdgeShardStrategy );
-        checkNotNull( "timeService required", timeService );
-
-
-        this.keyspace = keyspace;
-        this.cassandraConfig = cassandraConfig;
-        this.graphFig = graphFig;
-        this.writeEdgeShardStrategy = writeEdgeShardStrategy;
-        this.timeService = timeService;
-    }
-
-
-    @Override
-    public MutationBatch writeEdgeFromSource( final EdgeColumnFamilies columnFamilies, final ApplicationScope scope,
-                                              final MarkedEdge markedEdge, final Collection<Shard> shards,
-                                              final DirectedEdgeMeta directedEdgeMeta, final UUID timestamp ) {
-        ValidationUtils.validateApplicationScope( scope );
-        GraphValidation.validateEdge( markedEdge );
-        ValidationUtils.verifyTimeUuid( timestamp, "timestamp" );
-
-        return new SourceWriteOp( columnFamilies, markedEdge ) {
-
-            @Override
-            void writeEdge( final MutationBatch batch,
-                            final MultiTennantColumnFamily<ScopedRowKey<RowKey>, DirectedEdge> columnFamily,
-                            final ApplicationScope scope, final RowKey rowKey, final DirectedEdge edge,
-                            final Shard shard, final boolean isDeleted ) {
-
-                batch.withRow( columnFamily, ScopedRowKey.fromKey( scope.getApplication(), rowKey ) ).putColumn( edge, isDeleted );
-
-                if ( !isDeleted ) {
-                    writeEdgeShardStrategy.increment( scope, shard, 1, directedEdgeMeta );
-                }
-            }
-        }.createBatch( scope, shards, timestamp );
-    }
-
-
-    @Override
-    public MutationBatch writeEdgeFromSourceWithTargetType( final EdgeColumnFamilies columnFamilies,
-                                                            final ApplicationScope scope, final MarkedEdge markedEdge,
-                                                            final Collection<Shard> shards,
-                                                            final DirectedEdgeMeta directedEdgeMeta,
-                                                            final UUID timestamp ) {
-        ValidationUtils.validateApplicationScope( scope );
-        GraphValidation.validateEdge( markedEdge );
-        ValidationUtils.verifyTimeUuid( timestamp, "timestamp" );
-
-
-        return new SourceTargetTypeWriteOp( columnFamilies, markedEdge ) {
-
-            @Override
-            void writeEdge( final MutationBatch batch,
-                            final MultiTennantColumnFamily<ScopedRowKey<RowKeyType>, DirectedEdge> columnFamily,
-                            final ApplicationScope scope, final RowKeyType rowKey, final DirectedEdge edge,
-                            final Shard shard, final boolean isDeleted ) {
-
-
-                batch.withRow( columnFamily, ScopedRowKey.fromKey( scope.getApplication(), rowKey ) ).putColumn( edge, isDeleted );
-
-
-                if ( !isDeleted ) {
-                    writeEdgeShardStrategy.increment( scope, shard, 1, directedEdgeMeta );
-                }
-            }
-        }.createBatch( scope, shards, timestamp );
-    }
-
-
-    @Override
-    public MutationBatch writeEdgeToTarget( final EdgeColumnFamilies columnFamilies, final ApplicationScope scope,
-                                            final MarkedEdge markedEdge, final Collection<Shard> shards,
-                                            final DirectedEdgeMeta targetEdgeMeta, final UUID timestamp ) {
-        ValidationUtils.validateApplicationScope( scope );
-        GraphValidation.validateEdge( markedEdge );
-        ValidationUtils.verifyTimeUuid( timestamp, "timestamp" );
-
-
-        return new TargetWriteOp( columnFamilies, markedEdge ) {
-
-            @Override
-            void writeEdge( final MutationBatch batch,
-                            final MultiTennantColumnFamily<ScopedRowKey<RowKey>, DirectedEdge> columnFamily,
-                            final ApplicationScope scope, final RowKey rowKey, final DirectedEdge edge,
-                            final Shard shard, final boolean isDeleted ) {
-
-                batch.withRow( columnFamily, ScopedRowKey.fromKey( scope.getApplication(), rowKey ) ).putColumn( edge, isDeleted );
-
-
-                if ( !isDeleted ) {
-                    writeEdgeShardStrategy.increment( scope, shard, 1, targetEdgeMeta );
-                }
-            }
-        }.createBatch( scope, shards, timestamp );
-    }
-
-
-    @Override
-    public MutationBatch writeEdgeToTargetWithSourceType( final EdgeColumnFamilies columnFamilies,
-                                                          final ApplicationScope scope, final MarkedEdge markedEdge,
-                                                          final Collection<Shard> shards,
-                                                          final DirectedEdgeMeta directedEdgeMeta,
-                                                          final UUID timestamp ) {
-        ValidationUtils.validateApplicationScope( scope );
-        GraphValidation.validateEdge( markedEdge );
-        ValidationUtils.verifyTimeUuid( timestamp, "timestamp" );
-
-
-        return new TargetSourceTypeWriteOp( columnFamilies, markedEdge ) {
-
-            @Override
-            void writeEdge( final MutationBatch batch,
-                            final MultiTennantColumnFamily<ScopedRowKey<RowKeyType>, DirectedEdge> columnFamily,
-                            final ApplicationScope scope, final RowKeyType rowKey, final DirectedEdge edge,
-                            final Shard shard, final boolean isDeleted ) {
-
-                batch.withRow( columnFamilies.getTargetNodeSourceTypeCfName(), ScopedRowKey.fromKey( scope.getApplication(), rowKey ) )
-                     .putColumn( edge, isDeleted );
-
-
-                if ( !isDeleted ) {
-                    writeEdgeShardStrategy.increment( scope, shard, 1, directedEdgeMeta );
-                }
-            }
-        }.createBatch( scope, shards, timestamp );
-    }
-
-
-    @Override
-    public MutationBatch writeEdgeVersions( final EdgeColumnFamilies columnFamilies, final ApplicationScope scope,
-                                            final MarkedEdge markedEdge, final Collection<Shard> shards,
-                                            final DirectedEdgeMeta directedEdgeMeta, final UUID timestamp ) {
-
-        ValidationUtils.validateApplicationScope( scope );
-        GraphValidation.validateEdge( markedEdge );
-        ValidationUtils.verifyTimeUuid( timestamp, "timestamp" );
-
-
-        return new EdgeVersions( columnFamilies, markedEdge ) {
-
-            @Override
-            void writeEdge( final MutationBatch batch,
-                            final MultiTennantColumnFamily<ScopedRowKey<EdgeRowKey>, Long> columnFamily,
-                            final ApplicationScope scope, final EdgeRowKey rowKey, final Long column, final Shard shard,
-                            final boolean isDeleted ) {
-                batch.withRow( columnFamilies.getGraphEdgeVersions(), ScopedRowKey.fromKey( scope.getApplication(), rowKey ) )
-                     .putColumn( column, isDeleted );
-
-
-                if ( !isDeleted ) {
-                    writeEdgeShardStrategy.increment( scope, shard, 1, directedEdgeMeta );
-                }
-            }
-        }.createBatch( scope, shards, timestamp );
-    }
-
-
-    @Override
-    public MutationBatch deleteEdgeFromSource( final EdgeColumnFamilies columnFamilies, final ApplicationScope scope,
-                                               final MarkedEdge markedEdge, final Collection<Shard> shards,
-                                               final DirectedEdgeMeta directedEdgeMeta, final UUID timestamp ) {
-
-        return new SourceWriteOp( columnFamilies, markedEdge ) {
-
-            @Override
-            void writeEdge( final MutationBatch batch,
-                            final MultiTennantColumnFamily<ScopedRowKey<RowKey>, DirectedEdge> columnFamily,
-                            final ApplicationScope scope, final RowKey rowKey, final DirectedEdge edge,
-                            final Shard shard, final boolean isDeleted ) {
-
-                batch.withRow( columnFamily, ScopedRowKey.fromKey( scope.getApplication(), rowKey ) ).deleteColumn( edge );
-                writeEdgeShardStrategy.increment( scope, shard, -1, directedEdgeMeta );
-            }
-        }.createBatch( scope, shards, timestamp );
-    }
-
-
-    @Override
-    public MutationBatch deleteEdgeFromSourceWithTargetType( final EdgeColumnFamilies columnFamilies,
-                                                             final ApplicationScope scope, final MarkedEdge markedEdge,
-                                                             final Collection<Shard> shards,
-                                                             final DirectedEdgeMeta directedEdgeMeta,
-                                                             final UUID timestamp ) {
-        return new SourceTargetTypeWriteOp( columnFamilies, markedEdge ) {
-
-            @Override
-            void writeEdge( final MutationBatch batch,
-                            final MultiTennantColumnFamily<ScopedRowKey<RowKeyType>, DirectedEdge> columnFamily,
-                            final ApplicationScope scope, final RowKeyType rowKey, final DirectedEdge edge,
-                            final Shard shard, final boolean isDeleted ) {
-
-
-                batch.withRow( columnFamilies.getSourceNodeTargetTypeCfName(), ScopedRowKey.fromKey( scope.getApplication(), rowKey ) )
-                     .deleteColumn( edge );
-                writeEdgeShardStrategy.increment( scope, shard, -1, directedEdgeMeta );
-            }
-        }.createBatch( scope, shards, timestamp );
-    }
-
-
-    @Override
-    public MutationBatch deleteEdgeToTarget( final EdgeColumnFamilies columnFamilies, final ApplicationScope scope,
-                                             final MarkedEdge markedEdge, final Collection<Shard> shards,
-                                             final DirectedEdgeMeta directedEdgeMeta, final UUID timestamp ) {
-
-        return new TargetWriteOp( columnFamilies, markedEdge ) {
-
-            @Override
-            void writeEdge( final MutationBatch batch,
-                            final MultiTennantColumnFamily<ScopedRowKey<RowKey>, DirectedEdge> columnFamily,
-                            final ApplicationScope scope, final RowKey rowKey, final DirectedEdge edge,
-                            final Shard shard, final boolean isDeleted ) {
-
-                batch.withRow( columnFamily, ScopedRowKey.fromKey( scope.getApplication(), rowKey ) ).deleteColumn( edge );
-                writeEdgeShardStrategy.increment( scope, shard, -1, directedEdgeMeta );
-            }
-        }.createBatch( scope, shards, timestamp );
-    }
-
-
-    @Override
-    public MutationBatch deleteEdgeToTargetWithSourceType( final EdgeColumnFamilies columnFamilies,
-                                                           final ApplicationScope scope, final MarkedEdge markedEdge,
-                                                           final Collection<Shard> shards,
-                                                           final DirectedEdgeMeta directedEdgeMeta,
-                                                           final UUID timestamp ) {
-
-        return new TargetSourceTypeWriteOp( columnFamilies, markedEdge ) {
-
-            @Override
-            void writeEdge( final MutationBatch batch,
-                            final MultiTennantColumnFamily<ScopedRowKey<RowKeyType>, DirectedEdge> columnFamily,
-                            final ApplicationScope scope, final RowKeyType rowKey, final DirectedEdge edge,
-                            final Shard shard, final boolean isDeleted ) {
-
-                batch.withRow( columnFamilies.getTargetNodeSourceTypeCfName(), ScopedRowKey.fromKey( scope.getApplication(), rowKey ) )
-                     .deleteColumn( edge );
-                writeEdgeShardStrategy.increment( scope, shard, -1, directedEdgeMeta );
-            }
-        }.createBatch( scope, shards, timestamp );
-    }
-
-
-    @Override
-    public MutationBatch deleteEdgeVersions( final EdgeColumnFamilies columnFamilies, final ApplicationScope scope,
-                                             final MarkedEdge markedEdge, final Collection<Shard> shards,
-                                             final DirectedEdgeMeta directedEdgeMeta, final UUID timestamp ) {
-
-        return new EdgeVersions( columnFamilies, markedEdge ) {
-
-            @Override
-            void writeEdge( final MutationBatch batch,
-                            final MultiTennantColumnFamily<ScopedRowKey<EdgeRowKey>, Long> columnFamily,
-                            final ApplicationScope scope, final EdgeRowKey rowKey, final Long column, final Shard shard,
-                            final boolean isDeleted ) {
-                batch.withRow( columnFamilies.getGraphEdgeVersions(), ScopedRowKey.fromKey( scope.getApplication(), rowKey ) )
-                     .deleteColumn( column );
-                writeEdgeShardStrategy.increment( scope, shard, -1, directedEdgeMeta );
-            }
-        }.createBatch( scope, shards, timestamp );
-    }
-
-
-    @Override
-    public Iterator<MarkedEdge> getEdgeVersions( final EdgeColumnFamilies columnFamilies, final ApplicationScope scope,
-                                                 final SearchByEdge search, final Collection<Shard> shards ) {
-        ValidationUtils.validateApplicationScope( scope );
-        GraphValidation.validateSearchByEdge( search );
-
-        final Id targetId = search.targetNode();
-        final Id sourceId = search.sourceNode();
-        final String type = search.getType();
-        final long maxTimestamp = search.getMaxTimestamp();
-        final MultiTennantColumnFamily<ScopedRowKey<EdgeRowKey>, Long> columnFamily =
-                columnFamilies.getGraphEdgeVersions();
-        final Serializer<Long> serializer = columnFamily.getColumnSerializer();
-
-
-        final OrderedComparator<MarkedEdge> comparator = new OrderedComparator<>( DescendingTimestampComparator.INSTANCE, search.getOrder());
-
-
-
-
-        final EdgeSearcher<EdgeRowKey, Long, MarkedEdge> searcher =
-                new EdgeSearcher<EdgeRowKey, Long, MarkedEdge>( scope, shards, search.getOrder(),  comparator, maxTimestamp,
-                        search.last().transform( TRANSFORM ) ) {
-
-
-                    @Override
-                    protected Serializer<Long> getSerializer() {
-                        return serializer;
-                    }
-
-
-                    @Override
-                    public void buildRange( final RangeBuilder builder ) {
-
-
-                        if ( last.isPresent() ) {
-                            super.buildRange( builder );
-                            return;
-                        }
-
-                        //start seeking at a value < our max version
-                        builder.setStart( maxTimestamp );
-                    }
-
-
-                    @Override
-                    protected EdgeRowKey generateRowKey( long shard ) {
-                        return new EdgeRowKey( sourceId, type, targetId, shard );
-                    }
-
-
-                    @Override
-                    protected Long createColumn( final MarkedEdge last ) {
-                        return last.getTimestamp();
-                    }
-
-
-                    @Override
-                    protected MarkedEdge createEdge( final Long column, final boolean marked ) {
-                        return new SimpleMarkedEdge( sourceId, type, targetId, column.longValue(), marked );
-                    }
-
-
-
-                };
-
-        return new ShardsColumnIterator<>( searcher, columnFamily, keyspace, cassandraConfig.getReadCL(),
-                graphFig.getScanPageSize() );
-    }
-
-
-    @Override
-    public Iterator<MarkedEdge> getEdgesFromSource( final EdgeColumnFamilies columnFamilies,
-                                                    final ApplicationScope scope, final SearchByEdgeType search,
-                                                    final Collection<Shard> shards ) {
-
-        ValidationUtils.validateApplicationScope( scope );
-        GraphValidation.validateSearchByEdgeType( search );
-
-        final Id sourceId = search.getNode();
-        final String type = search.getType();
-        final long maxTimestamp = search.getMaxTimestamp();
-        final MultiTennantColumnFamily<ScopedRowKey<RowKey>, DirectedEdge> columnFamily =
-                columnFamilies.getSourceNodeCfName();
-        final Serializer<DirectedEdge> serializer = columnFamily.getColumnSerializer();
-
-
-        final OrderedComparator<MarkedEdge> comparator = new OrderedComparator<>( TargetDirectedEdgeDescendingComparator.INSTANCE, search.getOrder());
-
-
-
-        final EdgeSearcher<RowKey, DirectedEdge, MarkedEdge> searcher =
-                new EdgeSearcher<RowKey, DirectedEdge, MarkedEdge>( scope, shards, search.getOrder(), comparator, maxTimestamp,
-                        search.last().transform( TRANSFORM ) ) {
-
-
-                    @Override
-                    protected Serializer<DirectedEdge> getSerializer() {
-                        return serializer;
-                    }
-
-
-                    @Override
-                    protected RowKey generateRowKey( long shard ) {
-                        return new RowKey( sourceId, type, shard );
-                    }
-
-
-                    @Override
-                    protected DirectedEdge createColumn( final MarkedEdge last ) {
-                        return new DirectedEdge( last.getTargetNode(), last.getTimestamp() );
-                    }
-
-
-                    @Override
-                    protected MarkedEdge createEdge( final DirectedEdge edge, final boolean marked ) {
-                        return new SimpleMarkedEdge( sourceId, type, edge.id, edge.timestamp, marked );
-                    }
-                };
-
-
-        return new ShardsColumnIterator<>( searcher, columnFamily, keyspace, cassandraConfig.getReadCL(),
-                graphFig.getScanPageSize() );
-    }
-
-
-    @Override
-    public Iterator<MarkedEdge> getEdgesFromSourceByTargetType( final EdgeColumnFamilies columnFamilies,
-                                                                final ApplicationScope scope,
-                                                                final SearchByIdType search,
-                                                                final Collection<Shard> shards ) {
-
-        ValidationUtils.validateApplicationScope( scope );
-        GraphValidation.validateSearchByEdgeType( search );
-
-        final Id targetId = search.getNode();
-        final String type = search.getType();
-        final String targetType = search.getIdType();
-        final long maxTimestamp = search.getMaxTimestamp();
-        final MultiTennantColumnFamily<ScopedRowKey<RowKeyType>, DirectedEdge> columnFamily =
-                columnFamilies.getSourceNodeTargetTypeCfName();
-        final Serializer<DirectedEdge> serializer = columnFamily.getColumnSerializer();
-
-        final OrderedComparator<MarkedEdge> comparator = new OrderedComparator<>( TargetDirectedEdgeDescendingComparator.INSTANCE, search.getOrder());
-
-
-        final EdgeSearcher<RowKeyType, DirectedEdge, MarkedEdge> searcher =
-                new EdgeSearcher<RowKeyType, DirectedEdge, MarkedEdge>( scope, shards, search.getOrder(), comparator, maxTimestamp,
-                        search.last().transform( TRANSFORM ) ) {
-
-                    @Override
-                    protected Serializer<DirectedEdge> getSerializer() {
-                        return serializer;
-                    }
-
-
-                    @Override
-                    protected RowKeyType generateRowKey( long shard ) {
-                        return new RowKeyType( targetId, type, targetType, shard );
-                    }
-
-
-                    @Override
-                    protected DirectedEdge createColumn( final MarkedEdge last ) {
-                        return new DirectedEdge( last.getTargetNode(), last.getTimestamp() );
-                    }
-
-
-                    @Override
-                    protected MarkedEdge createEdge( final DirectedEdge edge, final boolean marked ) {
-                        return new SimpleMarkedEdge( targetId, type, edge.id, edge.timestamp, marked );
-                    }
-                };
-
-        return new ShardsColumnIterator( searcher, columnFamily, keyspace, cassandraConfig.getReadCL(),
-                graphFig.getScanPageSize() );
-    }
-
-
-    @Override
-    public Iterator<MarkedEdge> getEdgesToTarget( final EdgeColumnFamilies columnFamilies, final ApplicationScope scope,
-                                                  final SearchByEdgeType search, final Collection<Shard> shards ) {
-        ValidationUtils.validateApplicationScope( scope );
-        GraphValidation.validateSearchByEdgeType( search );
-
-        final Id targetId = search.getNode();
-        final String type = search.getType();
-        final long maxTimestamp = search.getMaxTimestamp();
-        final MultiTennantColumnFamily<ScopedRowKey<RowKey>, DirectedEdge> columnFamily =
-                columnFamilies.getTargetNodeCfName();
-        final Serializer<DirectedEdge> serializer = columnFamily.getColumnSerializer();
-
-        final OrderedComparator<MarkedEdge> comparator = new OrderedComparator<>( SourceDirectedEdgeDescendingComparator.INSTANCE, search.getOrder());
-
-        final EdgeSearcher<RowKey, DirectedEdge, MarkedEdge> searcher =
-                new EdgeSearcher<RowKey, DirectedEdge, MarkedEdge>( scope, shards, search.getOrder(),comparator,  maxTimestamp,
-                        search.last().transform( TRANSFORM ) ) {
-
-                    @Override
-                    protected Serializer<DirectedEdge> getSerializer() {
-                        return serializer;
-                    }
-
-
-                    @Override
-                    protected RowKey generateRowKey( long shard ) {
-                        return new RowKey( targetId, type, shard );
-                    }
-
-
-                    @Override
-                    protected DirectedEdge createColumn( final MarkedEdge last ) {
-                        return new DirectedEdge( last.getSourceNode(), last.getTimestamp() );
-                    }
-
-
-                    @Override
-                    protected MarkedEdge createEdge( final DirectedEdge edge, final boolean marked ) {
-                        return new SimpleMarkedEdge( edge.id, type, targetId, edge.timestamp, marked );
-                    }
-                };
-
-
-        return new ShardsColumnIterator<>( searcher, columnFamily, keyspace, cassandraConfig.getReadCL(),
-                graphFig.getScanPageSize() );
-    }
-
-
-    @Override
-    public Iterator<MarkedEdge> getEdgesToTargetBySourceType( final EdgeColumnFamilies columnFamilies,
-                                                              final ApplicationScope scope,
-                                                              final SearchByIdType search,
-                                                              final Collection<Shard> shards ) {
-
-        ValidationUtils.validateApplicationScope( scope );
-        GraphValidation.validateSearchByEdgeType( search );
-
-        final Id targetId = search.getNode();
-        final String sourceType = search.getIdType();
-        final String type = search.getType();
-        final long maxTimestamp = search.getMaxTimestamp();
-        final MultiTennantColumnFamily<ScopedRowKey<RowKeyType>, DirectedEdge> columnFamily =
-                columnFamilies.getTargetNodeSourceTypeCfName();
-        final Serializer<DirectedEdge> serializer = columnFamily.getColumnSerializer();
-
-        final OrderedComparator<MarkedEdge> comparator = new OrderedComparator<>( SourceDirectedEdgeDescendingComparator.INSTANCE, search.getOrder());
-
-
-        final EdgeSearcher<RowKeyType, DirectedEdge, MarkedEdge> searcher =
-                new EdgeSearcher<RowKeyType, DirectedEdge, MarkedEdge>( scope, shards, search.getOrder(), comparator, maxTimestamp,
-                        search.last().transform( TRANSFORM ) ) {
-                    @Override
-                    protected Serializer<DirectedEdge> getSerializer() {
-                        return serializer;
-                    }
-
-
-                    @Override
-                    protected RowKeyType generateRowKey( final long shard ) {
-                        return new RowKeyType( targetId, type, sourceType, shard );
-                    }
-
-
-                    @Override
-                    protected DirectedEdge createColumn( final MarkedEdge last ) {
-                        return new DirectedEdge( last.getTargetNode(), last.getTimestamp() );
-                    }
-
-
-                    @Override
-                    protected MarkedEdge createEdge( final DirectedEdge edge, final boolean marked ) {
-                        return new SimpleMarkedEdge( edge.id, type, targetId, edge.timestamp, marked );
-                    }
-                };
-
-        return new ShardsColumnIterator<>( searcher, columnFamily, keyspace, cassandraConfig.getReadCL(),
-                graphFig.getScanPageSize() );
-    }
-
-
-
-
-
-    /**
-     * Simple callback to perform puts and deletes with a common row setup code
-     *
-     * @param <R> The row key type
-     * @param <C> The column type
-     */
-    private abstract class RowOp<R, C> {
-
-
-        /**
-         * Return the column family used for the write
-         */
-        protected abstract MultiTennantColumnFamily<ScopedRowKey<R>, C> getColumnFamily();
-
-        /**
-         * Get the row key
-         */
-        public abstract R getRowKey( final Shard shard );
-
-        /**
-         * Get the column family value
-         */
-        protected abstract C getDirectedEdge();
-
-        /**
-         * Get the flag on if it's deleted
-         */
-        protected abstract boolean isDeleted();
-
-
-        /**
-         * Write the edge with the given data
-         */
-        abstract void writeEdge( final MutationBatch batch,
-                                 final MultiTennantColumnFamily<ScopedRowKey<R>, C> columnFamily,
-                                 final ApplicationScope scope, final R rowKey, final C column, final Shard shard,
-                                 final boolean isDeleted );
-
-
-        /**
-         * Create a mutation batch
-         */
-        public MutationBatch createBatch( final ApplicationScope scope, final Collection<Shard> shards,
-                                          final UUID opTimestamp ) {
-
-            final MutationBatch batch =
-                    keyspace.prepareMutationBatch().withConsistencyLevel( cassandraConfig.getWriteCL() )
-                            .withTimestamp( opTimestamp.timestamp() );
-
-
-            final C column = getDirectedEdge();
-            final MultiTennantColumnFamily<ScopedRowKey<R>, C> columnFamily = getColumnFamily();
-            final boolean isDeleted = isDeleted();
-
-
-            for ( Shard shard : shards ) {
-                final R rowKey = getRowKey( shard );
-                writeEdge( batch, columnFamily, scope, rowKey, column, shard, isDeleted );
-            }
-
-
-            return batch;
-        }
-    }
-
-
-    /**
-     * Perform a write of the source->target
-     */
-    private abstract class SourceWriteOp extends RowOp<RowKey, DirectedEdge> {
-
-        private final MultiTennantColumnFamily<ScopedRowKey<RowKey>, DirectedEdge> columnFamily;
-        private final Id sourceNodeId;
-
-
-        private final String type;
-        private final boolean isDeleted;
-        private final DirectedEdge directedEdge;
-
-
-        /**
-         * Write the source write operation
-         */
-        private SourceWriteOp( final EdgeColumnFamilies edgeColumnFamilies, final MarkedEdge markedEdge ) {
-            this.columnFamily = edgeColumnFamilies.getSourceNodeCfName();
-
-            this.sourceNodeId = markedEdge.getSourceNode();
-
-            this.type = markedEdge.getType();
-            this.isDeleted = markedEdge.isDeleted();
-
-            this.directedEdge = new DirectedEdge( markedEdge.getTargetNode(), markedEdge.getTimestamp() );
-        }
-
-
-        @Override
-        protected MultiTennantColumnFamily<ScopedRowKey<RowKey>, DirectedEdge> getColumnFamily() {
-            return columnFamily;
-        }
-
-
-        @Override
-        public RowKey getRowKey( final Shard shard ) {
-            return new RowKey( sourceNodeId, type, shard.getShardIndex() );
-        }
-
-
-        @Override
-        protected DirectedEdge getDirectedEdge() {
-            return directedEdge;
-        }
-
-
-        @Override
-        protected boolean isDeleted() {
-            return isDeleted;
-        }
-    }
-
-
-    /**
-     * Perform a write of the source->target with target type
-     */
-    private abstract class SourceTargetTypeWriteOp extends RowOp<RowKeyType, DirectedEdge> {
-
-        private final MultiTennantColumnFamily<ScopedRowKey<RowKeyType>, DirectedEdge> columnFamily;
-        private final Id sourceNodeId;
-        private final String type;
-        private Id targetId;
-        private final boolean isDeleted;
-        private final DirectedEdge directedEdge;
-
-
-        /**
-         * Write the source write operation
-         */
-        private SourceTargetTypeWriteOp( final EdgeColumnFamilies edgeColumnFamilies, final MarkedEdge markedEdge ) {
-            this.columnFamily = edgeColumnFamilies.getSourceNodeTargetTypeCfName();
-
-            this.sourceNodeId = markedEdge.getSourceNode();
-
-            this.type = markedEdge.getType();
-            this.targetId = markedEdge.getTargetNode();
-            this.isDeleted = markedEdge.isDeleted();
-
-            this.directedEdge = new DirectedEdge( targetId, markedEdge.getTimestamp() );
-        }
-
-
-        @Override
-        protected MultiTennantColumnFamily<ScopedRowKey<RowKeyType>, DirectedEdge> getColumnFamily() {
-            return columnFamily;
-        }
-
-
-        @Override
-        public RowKeyType getRowKey( final Shard shard ) {
-            return new RowKeyType( sourceNodeId, type, targetId, shard.getShardIndex() );
-        }
-
-
-        @Override
-        protected DirectedEdge getDirectedEdge() {
-            return directedEdge;
-        }
-
-
-        @Override
-        protected boolean isDeleted() {
-            return isDeleted;
-        }
-    }
-
-
-    /**
-     * Perform a write of the target <-- source
-     */
-    private abstract class TargetWriteOp extends RowOp<RowKey, DirectedEdge> {
-
-        private final MultiTennantColumnFamily<ScopedRowKey<RowKey>, DirectedEdge> columnFamily;
-        private final Id targetNode;
-
-
-        private final String type;
-        private final boolean isDeleted;
-        private final DirectedEdge directedEdge;
-
-
-        /**
-         * Write the source write operation
-         */
-        private TargetWriteOp( final EdgeColumnFamilies edgeColumnFamilies, final MarkedEdge markedEdge ) {
-            this.columnFamily = edgeColumnFamilies.getTargetNodeCfName();
-
-            this.targetNode = markedEdge.getTargetNode();
-
-            this.type = markedEdge.getType();
-            this.isDeleted = markedEdge.isDeleted();
-
-            this.directedEdge = new DirectedEdge( markedEdge.getSourceNode(), markedEdge.getTimestamp() );
-        }
-
-
-        @Override
-        protected MultiTennantColumnFamily<ScopedRowKey<RowKey>, DirectedEdge> getColumnFamily() {
-            return columnFamily;
-        }
-
-
-        @Override
-        public RowKey getRowKey( final Shard shard ) {
-            return new RowKey( targetNode, type, shard.getShardIndex() );
-        }
-
-
-        @Override
-        protected DirectedEdge getDirectedEdge() {
-            return directedEdge;
-        }
-
-
-        @Override
-        protected boolean isDeleted() {
-            return isDeleted;
-        }
-    }
-
-
-    /**
-     * Perform a write of the target<--source with source type
-     */
-    private abstract class TargetSourceTypeWriteOp extends RowOp<RowKeyType, DirectedEdge> {
-
-        private final MultiTennantColumnFamily<ScopedRowKey<RowKeyType>, DirectedEdge> columnFamily;
-        private final Id targetNode;
-
-        private final Id sourceNode;
-
-        final String type;
-
-        final boolean isDeleted;
-        final DirectedEdge directedEdge;
-
-
-        /**
-         * Write the source write operation
-         */
-        private TargetSourceTypeWriteOp( final EdgeColumnFamilies edgeColumnFamilies, final MarkedEdge markedEdge ) {
-            this.columnFamily = edgeColumnFamilies.getSourceNodeTargetTypeCfName();
-
-            this.targetNode = markedEdge.getTargetNode();
-            this.sourceNode = markedEdge.getSourceNode();
-
-            this.type = markedEdge.getType();
-            this.isDeleted = markedEdge.isDeleted();
-
-            this.directedEdge = new DirectedEdge( sourceNode, markedEdge.getTimestamp() );
-        }
-
-
-        @Override
-        protected MultiTennantColumnFamily<ScopedRowKey<RowKeyType>, DirectedEdge> getColumnFamily() {
-            return columnFamily;
-        }
-
-
-        @Override
-        public RowKeyType getRowKey( final Shard shard ) {
-            return new RowKeyType( targetNode, type, sourceNode, shard.getShardIndex() );
-        }
-
-
-        @Override
-        protected DirectedEdge getDirectedEdge() {
-            return directedEdge;
-        }
-
-
-        @Override
-        protected boolean isDeleted() {
-            return isDeleted;
-        }
-    }
-
-
-    /**
-     * Perform a write of the edge versions
-     */
-    private abstract class EdgeVersions extends RowOp<EdgeRowKey, Long> {
-
-        private final MultiTennantColumnFamily<ScopedRowKey<EdgeRowKey>, Long> columnFamily;
-        private final Id targetNode;
-
-        private final Id sourceNode;
-
-        final String type;
-
-        final boolean isDeleted;
-        final Long edgeVersion;
-
-
-        /**
-         * Write the source write operation
-         */
-        private EdgeVersions( final EdgeColumnFamilies edgeColumnFamilies, final MarkedEdge markedEdge ) {
-            this.columnFamily = edgeColumnFamilies.getGraphEdgeVersions();
-
-            this.targetNode = markedEdge.getTargetNode();
-            this.sourceNode = markedEdge.getSourceNode();
-
-            this.type = markedEdge.getType();
-            this.isDeleted = markedEdge.isDeleted();
-
-            this.edgeVersion = markedEdge.getTimestamp();
-        }
-
-
-        @Override
-        protected MultiTennantColumnFamily<ScopedRowKey<EdgeRowKey>, Long> getColumnFamily() {
-            return columnFamily;
-        }
-
-
-        @Override
-        public EdgeRowKey getRowKey( final Shard shard ) {
-            return new EdgeRowKey( sourceNode, type, targetNode, shard.getShardIndex() );
-        }
-
-
-        @Override
-        protected Long getDirectedEdge() {
-            return edgeVersion;
-        }
-
-
-        @Override
-        protected boolean isDeleted() {
-            return isDeleted;
-        }
-    }
-
-
-
-
-
-
-    private static final Function<Edge, MarkedEdge> TRANSFORM = new Function<Edge, MarkedEdge>() {
-        @Nullable
-        @Override
-        public MarkedEdge apply( @Nullable final Edge input ) {
-
-            if ( input == null ) {
-                return null;
-            }
-
-            if ( input instanceof MarkedEdge ) {
-                return ( MarkedEdge ) input;
-            }
-
-            return new SimpleMarkedEdge( input.getSourceNode(), input.getType(), input.getTargetNode(),
-                    input.getTimestamp(), false );
-        }
-    };
-}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9f3bf2b3/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
index 2fb08a4..fdb0952 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
@@ -109,24 +109,10 @@ public class GraphManagerShardConsistencyIT {
 
         originalShardSize = ConfigurationManager.getConfigInstance().getProperty( GraphFig.SHARD_SIZE );
 
-        originalShardTimeout = ConfigurationManager.getConfigInstance().getProperty( GraphFig.SHARD_CACHE_TIMEOUT );
-
-        originalShardDelta = ConfigurationManager.getConfigInstance().getProperty( GraphFig.SHARD_MIN_DELTA );
-
 
         ConfigurationManager.getConfigInstance().setProperty( GraphFig.SHARD_SIZE, 500 );
 
 
-        final long cacheTimeout = 2000;
-        //set our cache timeout to the above value
-        ConfigurationManager.getConfigInstance().setProperty( GraphFig.SHARD_CACHE_TIMEOUT, cacheTimeout );
-
-
-        final long minDelta = ( long ) ( cacheTimeout * 2.5 );
-
-        ConfigurationManager.getConfigInstance().setProperty( GraphFig.SHARD_MIN_DELTA, minDelta );
-
-
         //get the system property of the UUID to use.  If one is not set, use the defualt
         String uuidString = System.getProperty( "org.id", "80a42760-b699-11e3-a5e2-0800200c9a66" );
 
@@ -215,7 +201,7 @@ public class GraphManagerShardConsistencyIT {
 
 
         //min stop time the min delta + 1 cache cycle timeout
-        final long minExecutionTime = graphFig.getShardMinDelta() + graphFig.getShardCacheTimeout();
+        final long minExecutionTime = 10000;
 
 
         log.info( "Writing {} edges per worker on {} workers in {} injectors", workerWriteLimit, numWorkersPerInjector,

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9f3bf2b3/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerializationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerializationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerializationTest.java
index 2641ed7..22cc197 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerializationTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerializationTest.java
@@ -102,7 +102,7 @@ public class EdgeShardSerializationTest {
 
 
         Iterator<Shard> results =
-                edgeShardSerialization.getShardMetaData( scope, Optional.<Shard>absent(), sourceEdgeMeta );
+                edgeShardSerialization.getShardMetaDataLocal( scope, Optional.<Shard>absent(), sourceEdgeMeta );
 
 
         assertEquals( shard3, results.next() );
@@ -117,13 +117,13 @@ public class EdgeShardSerializationTest {
         final DirectedEdgeMeta targetEdgeMeta = DirectedEdgeMeta.fromTargetNodeSourceType( now, "edgeType", "subType" );
 
         //test we get nothing with the other node type
-        results = edgeShardSerialization.getShardMetaData( scope, Optional.<Shard>absent(), targetEdgeMeta );
+        results = edgeShardSerialization.getShardMetaDataLocal( scope, Optional.<Shard>absent(), targetEdgeMeta );
 
         assertFalse( results.hasNext() );
 
 
         //test paging and size
-        results = edgeShardSerialization.getShardMetaData( scope, Optional.of( shard2 ), sourceEdgeMeta );
+        results = edgeShardSerialization.getShardMetaDataLocal( scope, Optional.of( shard2 ), sourceEdgeMeta );
 
         assertEquals( shard2, results.next() );
 
@@ -163,7 +163,7 @@ public class EdgeShardSerializationTest {
 
 
         Iterator<Shard> results =
-                edgeShardSerialization.getShardMetaData( scope, Optional.<Shard>absent(), sourceEdgeMeta );
+                edgeShardSerialization.getShardMetaDataLocal( scope, Optional.<Shard>absent(), sourceEdgeMeta );
 
         assertEquals( shard3, results.next() );
 
@@ -177,7 +177,7 @@ public class EdgeShardSerializationTest {
 
         final DirectedEdgeMeta targetEdgeMeta = DirectedEdgeMeta.fromTargetNodeSourceType( now, "edgeType", "subType" );
 
-        results = edgeShardSerialization.getShardMetaData( scope, Optional.<Shard>absent(), targetEdgeMeta );
+        results = edgeShardSerialization.getShardMetaDataLocal( scope, Optional.<Shard>absent(), targetEdgeMeta );
 
         assertFalse( results.hasNext() );
 
@@ -185,7 +185,7 @@ public class EdgeShardSerializationTest {
         //test paging and size
         edgeShardSerialization.removeShardMeta( scope, shard1, sourceEdgeMeta ).execute();
 
-        results = edgeShardSerialization.getShardMetaData( scope, Optional.<Shard>absent(), sourceEdgeMeta );
+        results = edgeShardSerialization.getShardMetaDataLocal( scope, Optional.<Shard>absent(), sourceEdgeMeta );
 
         assertEquals( shard3, results.next() );
 
@@ -198,7 +198,7 @@ public class EdgeShardSerializationTest {
 
         edgeShardSerialization.removeShardMeta( scope, shard3, sourceEdgeMeta ).execute();
 
-        results = edgeShardSerialization.getShardMetaData( scope, Optional.<Shard>absent(), sourceEdgeMeta );
+        results = edgeShardSerialization.getShardMetaDataLocal( scope, Optional.<Shard>absent(), sourceEdgeMeta );
 
 
         assertFalse( results.hasNext() );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9f3bf2b3/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
index ac965cd..265c997 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
@@ -79,45 +79,9 @@ public class NodeShardAllocationTest {
 
         graphFig = mock( GraphFig.class );
 
-        when( graphFig.getShardCacheSize() ).thenReturn( 10000l );
-        when( graphFig.getShardSize() ).thenReturn( 20000l );
-
-        final long timeout = 30000;
-        when( graphFig.getShardCacheTimeout() ).thenReturn( timeout );
-        when( graphFig.getShardMinDelta() ).thenReturn( timeout * 2 );
-    }
-
-
-    @Test
-    public void minTime() {
-        final ShardGroupCompaction shardGroupCompaction = mock( ShardGroupCompaction.class );
-
-        final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
-
-        final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
 
-        final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
-
-        final NodeShardApproximation nodeShardCounterSerialization = mock( NodeShardApproximation.class );
-
-
-        final TimeService timeService = mock( TimeService.class );
-
-
-        NodeShardAllocation approximation =
-                new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
-                        nodeShardCounterSerialization, timeService, graphFig, shardGroupCompaction );
-
-
-        final long timeservicetime = System.currentTimeMillis();
-
-        when( timeService.getCurrentTime() ).thenReturn( timeservicetime );
-
-        final long expected = timeservicetime - 2 * graphFig.getShardCacheTimeout();
-
-        final long returned = approximation.getMinTime();
+        when( graphFig.getShardSize() ).thenReturn( 20000l );
 
-        assertEquals( "Correct time was returned", expected, returned );
     }
 
 
@@ -153,7 +117,7 @@ public class NodeShardAllocationTest {
         final Shard firstShard = new Shard( 0l, 0l, true );
         final Shard futureShard = new Shard( 10000l, timeservicetime, false );
 
-        final ShardEntryGroup shardEntryGroup = new ShardEntryGroup( 1000l );
+        final ShardEntryGroup shardEntryGroup = new ShardEntryGroup( );
         shardEntryGroup.addShard( futureShard );
         shardEntryGroup.addShard( firstShard );
 
@@ -195,7 +159,7 @@ public class NodeShardAllocationTest {
 
         final Shard futureShard = new Shard( 10000l, timeservicetime, true );
 
-        final ShardEntryGroup shardEntryGroup = new ShardEntryGroup( 1000l );
+        final ShardEntryGroup shardEntryGroup = new ShardEntryGroup( );
         shardEntryGroup.addShard( futureShard );
 
         final DirectedEdgeMeta targetEdgeMeta = DirectedEdgeMeta.fromSourceNodeTargetType( nodeId, type, subType );
@@ -243,7 +207,7 @@ public class NodeShardAllocationTest {
 
         final Shard futureShard = new Shard( 0l, 0l, true );
 
-        final ShardEntryGroup shardEntryGroup = new ShardEntryGroup( 1000l );
+        final ShardEntryGroup shardEntryGroup = new ShardEntryGroup(  );
         shardEntryGroup.addShard( futureShard );
 
         final DirectedEdgeMeta targetEdgeMeta = DirectedEdgeMeta.fromSourceNodeTargetType( nodeId, type, subType );
@@ -358,7 +322,7 @@ public class NodeShardAllocationTest {
 
         final Shard futureShard = new Shard( 0l, 0l, true );
 
-        final ShardEntryGroup shardEntryGroup = new ShardEntryGroup( 1000l );
+        final ShardEntryGroup shardEntryGroup = new ShardEntryGroup( );
         shardEntryGroup.addShard( futureShard );
 
         final DirectedEdgeMeta targetEdgeMeta = DirectedEdgeMeta.fromSourceNodeTargetType( nodeId, type, subType );
@@ -449,7 +413,7 @@ public class NodeShardAllocationTest {
 
         final Shard futureShard = new Shard( 0l, 0l, true );
 
-        final ShardEntryGroup shardEntryGroup = new ShardEntryGroup( 1000l );
+        final ShardEntryGroup shardEntryGroup = new ShardEntryGroup(  );
         shardEntryGroup.addShard( futureShard );
 
         final DirectedEdgeMeta targetEdgeMeta = DirectedEdgeMeta.fromSourceNodeTargetType( nodeId, type, subType );
@@ -518,13 +482,6 @@ public class NodeShardAllocationTest {
 
         when( timeService.getCurrentTime() ).thenReturn( timeservicetime );
 
-        assertTrue( "Shard cache mocked", graphFig.getShardCacheTimeout() > 0 );
-
-
-        /**
-         * Simulates clock drift when 2 nodes create future shards near one another
-         */
-        final long minDelta = graphFig.getShardMinDelta();
 
 
         final Shard minShard = new Shard( 0l, 0l, true );
@@ -541,13 +498,13 @@ public class NodeShardAllocationTest {
         // should be removed
 
         //this should get dropped, It's allocated after future shard2 even though the time is less
-        final Shard futureShard1 = new Shard( 10000, minTime + minDelta, false );
+        final Shard futureShard1 = new Shard( 10000, minTime + 10000, false );
 
         //should get kept.
         final Shard futureShard2 = new Shard( 10005, minTime, false );
 
         //should be removed
-        final Shard futureShard3 = new Shard( 10010, minTime + minDelta / 2, false );
+        final Shard futureShard3 = new Shard( 10010, minTime + 1000, false );
 
         final DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromTargetNodeSourceType( nodeId, type, subType );
 
@@ -555,7 +512,7 @@ public class NodeShardAllocationTest {
          * Mock up returning a min shard
          */
         when( edgeShardSerialization
-                .getShardMetaData( same( scope ), any( Optional.class ), same( directedEdgeMeta ) ) ).thenReturn(
+                .getShardMetaDataLocal( same( scope ), any( Optional.class ), same( directedEdgeMeta ) ) ).thenReturn(
                 Arrays.asList( futureShard3, futureShard2, futureShard1, compactedShard, minShard ).iterator() );
 
 
@@ -583,7 +540,7 @@ public class NodeShardAllocationTest {
         //now verify all 4 are in this group.  This is because the first shard (0,0) (n-1_ may be the only shard other
         //nodes see while we're rolling our state.  This means it should be read and merged from as well
 
-        Collection<Shard> writeShards = shardEntryGroup.getWriteShards( minTime + minDelta );
+        Collection<Shard> writeShards = shardEntryGroup.getWriteShards( minTime  );
 
         assertEquals( "Shard size as expected", 1, writeShards.size() );
 
@@ -603,7 +560,7 @@ public class NodeShardAllocationTest {
         shardEntryGroup = result.next();
 
 
-        writeShards = shardEntryGroup.getWriteShards( minTime + minDelta );
+        writeShards = shardEntryGroup.getWriteShards( minTime  );
 
 
         assertTrue( "Previous shard present", writeShards.contains( minShard ) );
@@ -635,7 +592,7 @@ public class NodeShardAllocationTest {
 
         final TimeService timeService = mock( TimeService.class );
 
-        final long returnTime = System.currentTimeMillis() + graphFig.getShardCacheTimeout() * 2;
+        final long returnTime = System.currentTimeMillis();
 
         when( timeService.getCurrentTime() ).thenReturn( returnTime );
 
@@ -656,7 +613,7 @@ public class NodeShardAllocationTest {
          * Mock up returning an empty iterator, our audit shouldn't create a new shard
          */
         when( edgeShardSerialization
-                .getShardMetaData( same( scope ), any( Optional.class ), same( directedEdgeMeta ) ) )
+                .getShardMetaDataLocal( same( scope ), any( Optional.class ), same( directedEdgeMeta ) ) )
                 .thenReturn( Collections.<Shard>emptyList().iterator() );
 
 
@@ -699,77 +656,4 @@ public class NodeShardAllocationTest {
     }
 
 
-    @Test
-    public void invalidConfiguration() {
-
-        final ShardGroupCompaction shardGroupCompaction = mock( ShardGroupCompaction.class );
-
-        final GraphFig graphFig = mock( GraphFig.class );
-
-        final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
-
-        final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
-
-        final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
-
-        final NodeShardApproximation nodeShardApproximation = mock( NodeShardApproximation.class );
-
-
-        /**
-         * Return 100000 milliseconds
-         */
-        final TimeService timeService = mock( TimeService.class );
-
-        final long time = 100000l;
-
-        when( timeService.getCurrentTime() ).thenReturn( time );
-
-
-        final long cacheTimeout = 30000l;
-
-        when( graphFig.getShardCacheTimeout() ).thenReturn( 30000l );
-
-
-        final long tooSmallDelta = ( long ) ( ( cacheTimeout * 2 ) * .99 );
-
-        when( graphFig.getShardMinDelta() ).thenReturn( tooSmallDelta );
-
-        NodeShardAllocation approximation =
-                new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
-                        nodeShardApproximation, timeService, graphFig, shardGroupCompaction );
-
-
-        /**
-         * Should throw an exception
-         */
-        try {
-            approximation.getMinTime();
-            fail( "Should have thrown a GraphRuntimeException" );
-        }
-        catch ( GraphRuntimeException gre ) {
-            //swallow
-        }
-
-        //now test something that passes.
-
-        final long minDelta = cacheTimeout * 2;
-
-        when( graphFig.getShardMinDelta() ).thenReturn( minDelta );
-
-        long returned = approximation.getMinTime();
-
-        long expectedReturned = time - minDelta;
-
-        assertEquals( expectedReturned, returned );
-
-        final long delta = cacheTimeout * 4;
-
-        when( graphFig.getShardMinDelta() ).thenReturn( delta );
-
-        returned = approximation.getMinTime();
-
-        expectedReturned = time - delta;
-
-        assertEquals( expectedReturned, returned );
-    }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9f3bf2b3/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCacheTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCacheTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCacheTest.java
index 3b717a4..57996ec 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCacheTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCacheTest.java
@@ -87,13 +87,13 @@ public class NodeShardCacheTest {
         final long newTime = 10000l;
 
 
-        NodeShardCache cache = new NodeShardCacheImpl( allocation, graphFig );
+        NodeShardCache cache = new NodeShardCacheImpl( allocation );
 
 
         final Optional max = Optional.absent();
 
 
-        final ShardEntryGroup group = new ShardEntryGroup( newTime );
+        final ShardEntryGroup group = new ShardEntryGroup( );
         group.addShard( new Shard( 0, 0, true ) );
 
 
@@ -163,7 +163,7 @@ public class NodeShardCacheTest {
          * Set our min mid and max
          */
 
-        NodeShardCache cache = new NodeShardCacheImpl( allocation, graphFig );
+        NodeShardCache cache = new NodeShardCacheImpl( allocation );
 
 
         final Shard minShard = new Shard( 0, 0, true );
@@ -174,14 +174,14 @@ public class NodeShardCacheTest {
         /**
          * Simulate returning all shards
          */
-        final ShardEntryGroup minShardGroup = new ShardEntryGroup( 10000 );
+        final ShardEntryGroup minShardGroup = new ShardEntryGroup( );
         minShardGroup.addShard( minShard );
 
-        final ShardEntryGroup midShardGroup = new ShardEntryGroup( 10000 );
+        final ShardEntryGroup midShardGroup = new ShardEntryGroup( );
         midShardGroup.addShard( midShard );
 
 
-        final ShardEntryGroup maxShardGroup = new ShardEntryGroup( 10000 );
+        final ShardEntryGroup maxShardGroup = new ShardEntryGroup( );
         maxShardGroup.addShard( maxShard );
 
 
@@ -302,8 +302,6 @@ public class NodeShardCacheTest {
 
     private GraphFig getFigMock() {
         final GraphFig graphFig = mock( GraphFig.class );
-        when( graphFig.getShardCacheSize() ).thenReturn( 1000l );
-        when( graphFig.getShardCacheTimeout() ).thenReturn( 30000l );
 
         return graphFig;
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9f3bf2b3/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroupTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroupTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroupTest.java
index 9289340..240c3ff 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroupTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroupTest.java
@@ -37,11 +37,10 @@ public class ShardEntryGroupTest {
     @Test
     public void singleEntry() {
 
-        final long delta = 10000;
 
         Shard rootShard = new Shard( 0, 0, false );
 
-        ShardEntryGroup shardEntryGroup = new ShardEntryGroup( delta );
+        ShardEntryGroup shardEntryGroup = new ShardEntryGroup( );
 
         final boolean result = shardEntryGroup.addShard( rootShard );
 
@@ -58,14 +57,12 @@ public class ShardEntryGroupTest {
     @Test
     public void allocatedWithinDelta() {
 
-        final long delta = 10000;
-
         Shard firstShard = new Shard( 1000, 1000, false );
 
         Shard secondShard = new Shard( 1000, 1001, false );
 
 
-        ShardEntryGroup shardEntryGroup = new ShardEntryGroup( delta );
+        ShardEntryGroup shardEntryGroup = new ShardEntryGroup(  );
 
         boolean result = shardEntryGroup.addShard( secondShard );
 
@@ -94,8 +91,6 @@ public class ShardEntryGroupTest {
     @Test
     public void testShardTarget() {
 
-        final long delta = 10000;
-
         Shard compactedShard = new Shard( 0, 0, true );
 
         Shard firstShard = new Shard( 1000, 1000, false );
@@ -103,7 +98,7 @@ public class ShardEntryGroupTest {
         Shard secondShard = new Shard( 1000, 1001, false );
 
 
-        ShardEntryGroup shardEntryGroup = new ShardEntryGroup( delta );
+        ShardEntryGroup shardEntryGroup = new ShardEntryGroup( );
 
         boolean result = shardEntryGroup.addShard( secondShard );
 
@@ -129,27 +124,25 @@ public class ShardEntryGroupTest {
 
         //shouldn't return true, since we haven't passed delta time in the second shard
         assertFalse( "Merge cannot be run within min time",
-                shardEntryGroup.shouldCompact( firstShard.getCreatedTime() + delta ) );
+                shardEntryGroup.shouldCompact( firstShard.getCreatedTime() ) );
 
         //shouldn't return true, since we haven't passed delta time in the second shard
         assertFalse( "Merge cannot be run within min time",
-                shardEntryGroup.shouldCompact( secondShard.getCreatedTime() + delta ) );
+                shardEntryGroup.shouldCompact( secondShard.getCreatedTime() ) );
 
         //we haven't passed the delta in the neighbor that would be our source, shard2, we shouldn't return true
         //we read from shard2 and write to shard1
         assertFalse( "Merge cannot be run with after min time",
-                shardEntryGroup.shouldCompact( firstShard.getCreatedTime() + delta + 1 ) );
+                shardEntryGroup.shouldCompact( firstShard.getCreatedTime()) );
 
         assertTrue( "Merge should be run with after min time",
-                shardEntryGroup.shouldCompact( secondShard.getCreatedTime() + delta + 1 ) );
+                shardEntryGroup.shouldCompact( secondShard.getCreatedTime()) );
     }
 
 
     @Test
     public void multipleShardGroups() {
 
-        final long delta = 10000;
-
         Shard firstShard = new Shard( 1000, 10000, false );
 
         Shard secondShard = new Shard( 999, 9000, false );
@@ -159,7 +152,7 @@ public class ShardEntryGroupTest {
         Shard compactedShard2 = new Shard( 800, 7000, true );
 
 
-        ShardEntryGroup shardEntryGroup = new ShardEntryGroup( delta );
+        ShardEntryGroup shardEntryGroup = new ShardEntryGroup(  );
 
         boolean result = shardEntryGroup.addShard( firstShard );
 
@@ -177,7 +170,7 @@ public class ShardEntryGroupTest {
 
         assertFalse( "Shouldn't add since it's compacted", result );
 
-        ShardEntryGroup secondGroup = new ShardEntryGroup( delta );
+        ShardEntryGroup secondGroup = new ShardEntryGroup(  );
 
         result = secondGroup.addShard( compactedShard2 );
 
@@ -187,9 +180,6 @@ public class ShardEntryGroupTest {
 
     @Test
     public void boundShardGroup() {
-
-        final long delta = 10000;
-
         Shard firstShard = new Shard( 1000, 10000, false );
 
         Shard secondShard = new Shard( 999, 9000, false );
@@ -197,7 +187,7 @@ public class ShardEntryGroupTest {
         Shard compactedShard1 = new Shard( 900, 8000, true );
 
 
-        ShardEntryGroup shardEntryGroup = new ShardEntryGroup( delta );
+        ShardEntryGroup shardEntryGroup = new ShardEntryGroup( );
 
         boolean result = shardEntryGroup.addShard( firstShard );
 
@@ -220,18 +210,18 @@ public class ShardEntryGroupTest {
 
         //shouldn't return true, since we haven't passed delta time in the second shard
         assertFalse( "Merge cannot be run within min time",
-                shardEntryGroup.shouldCompact( firstShard.getCreatedTime() + delta ) );
+                shardEntryGroup.shouldCompact( firstShard.getCreatedTime()  ) );
 
         //shouldn't return true, since we haven't passed delta time in the second shard
         assertFalse( "Merge cannot be run within min time",
-                shardEntryGroup.shouldCompact( secondShard.getCreatedTime() + delta ) );
+                shardEntryGroup.shouldCompact( secondShard.getCreatedTime()  ) );
 
 
         assertFalse( "Merge cannot be run within min time",
-                shardEntryGroup.shouldCompact( secondShard.getCreatedTime() + delta + 1 ) );
+                shardEntryGroup.shouldCompact( secondShard.getCreatedTime()  + 1 ) );
 
         assertTrue( "Merge should be run with after min time",
-                shardEntryGroup.shouldCompact( firstShard.getCreatedTime() + delta + 1 ) );
+                shardEntryGroup.shouldCompact( firstShard.getCreatedTime()  + 1 ) );
     }
 
 
@@ -241,8 +231,6 @@ public class ShardEntryGroupTest {
     @Test
     public void getAllReadShards() {
 
-        final long delta = 10000;
-
         Shard firstShard = new Shard( 1000, 10000, false );
 
         Shard secondShard = new Shard( 999, 9000, false );
@@ -250,7 +238,7 @@ public class ShardEntryGroupTest {
         Shard compactedShard1 = new Shard( 900, 8000, true );
 
 
-        ShardEntryGroup shardEntryGroup = new ShardEntryGroup( delta );
+        ShardEntryGroup shardEntryGroup = new ShardEntryGroup(  );
 
         boolean result = shardEntryGroup.addShard( firstShard );
 
@@ -280,8 +268,6 @@ public class ShardEntryGroupTest {
     @Test
     public void getAllWriteShardsNotPastCompaction() {
 
-        final long delta = 10000;
-
         Shard firstShard = new Shard( 1000, 10000, false );
 
         Shard secondShard = new Shard( 999, 9000, false );
@@ -289,7 +275,7 @@ public class ShardEntryGroupTest {
         Shard compactedShard = new Shard( 900, 8000, true );
 
 
-        ShardEntryGroup shardEntryGroup = new ShardEntryGroup( delta );
+        ShardEntryGroup shardEntryGroup = new ShardEntryGroup( );
 
         boolean result = shardEntryGroup.addShard( firstShard );
 
@@ -304,14 +290,14 @@ public class ShardEntryGroupTest {
         assertTrue( "Shard added", result );
 
 
-        Collection<Shard> writeShards = shardEntryGroup.getWriteShards( firstShard.getCreatedTime() + delta );
+        Collection<Shard> writeShards = shardEntryGroup.getWriteShards( firstShard.getCreatedTime()  );
 
         assertEquals( "Shard size correct", 1, writeShards.size() );
 
         assertTrue( "Root shard present", writeShards.contains( compactedShard ) );
 
 
-        writeShards = shardEntryGroup.getWriteShards( secondShard.getCreatedTime() + delta );
+        writeShards = shardEntryGroup.getWriteShards( secondShard.getCreatedTime()  );
 
         assertEquals( "Shard size correct", 1, writeShards.size() );
 
@@ -321,7 +307,7 @@ public class ShardEntryGroupTest {
         /**
          * Not the max created timestamp, shouldn't return less than all shards
          */
-        writeShards = shardEntryGroup.getWriteShards( secondShard.getCreatedTime() + 1 + delta );
+        writeShards = shardEntryGroup.getWriteShards( secondShard.getCreatedTime() + 1  );
 
         assertEquals( "Shard size correct", 1, writeShards.size() );
 
@@ -331,7 +317,7 @@ public class ShardEntryGroupTest {
 
         assertEquals( "Compaction target correct", secondShard, shardEntryGroup.getCompactionTarget() );
 
-        writeShards = shardEntryGroup.getWriteShards( firstShard.getCreatedTime() + 1 + delta );
+        writeShards = shardEntryGroup.getWriteShards( firstShard.getCreatedTime() + 1  );
 
         assertEquals( "Shard size correct", 1, writeShards.size() );
 
@@ -343,15 +329,13 @@ public class ShardEntryGroupTest {
     @Test( expected = IllegalArgumentException.class )
     public void failsInsertionOrder() {
 
-        final long delta = 10000;
-
         Shard secondShard = new Shard( 20000, 10000, false );
 
         Shard firstShard = new Shard( 10000, 10000, false );
 
         Shard rootShard = new Shard( 0, 0, false );
 
-        ShardEntryGroup shardEntryGroup = new ShardEntryGroup( delta );
+        ShardEntryGroup shardEntryGroup = new ShardEntryGroup( );
 
         boolean result = shardEntryGroup.addShard( secondShard );
 
@@ -370,15 +354,13 @@ public class ShardEntryGroupTest {
     @Test
     public void shardEntryAddList() {
 
-        final long delta = 10000;
-
         Shard highShard = new Shard( 30000, 1000, false );
 
         Shard midShard = new Shard( 20000, 1000, true );
 
         Shard lowShard = new Shard( 10000, 1000, false );
 
-        ShardEntryGroup shardEntryGroup = new ShardEntryGroup( delta );
+        ShardEntryGroup shardEntryGroup = new ShardEntryGroup(  );
 
         boolean result = shardEntryGroup.addShard( highShard );
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9f3bf2b3/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java
index 7d4b7f6..ab457d3 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java
@@ -84,7 +84,7 @@ public class ShardGroupCompactionTest {
         //we shouldn't be able to compact, should throw an exception
         final long timeNow = createTime + delta - 1;
 
-        ShardEntryGroup group = new ShardEntryGroup( delta );
+        ShardEntryGroup group = new ShardEntryGroup( );
         group.addShard( new Shard( 2000, createTime, false ) );
         group.addShard( new Shard( 1000, 5000, true ) );
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9f3bf2b3/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java
index 32a0cda..71947f8 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java
@@ -97,7 +97,6 @@ public class NodeShardApproximationTest {
 
         graphFig = mock( GraphFig.class );
 
-        when( graphFig.getShardCacheSize() ).thenReturn( 10000l );
         when( graphFig.getShardSize() ).thenReturn( 250000l );
         when( graphFig.getCounterFlushQueueSize() ).thenReturn( 10000 );
 
@@ -466,57 +465,53 @@ public class NodeShardApproximationTest {
         }
 
 
-        @Override
-        public long getShardCacheTimeout() {
-            return 0;  //To change body of implemented methods use File | Settings | File Templates.
-        }
 
 
         @Override
-        public long getShardMinDelta() {
-            return 0;  //To change body of implemented methods use File | Settings | File Templates.
+        public int getShardAuditWorkerCount() {
+            return 0;
         }
 
 
         @Override
-        public long getShardCacheSize() {
-            return 0;  //To change body of implemented methods use File | Settings | File Templates.
+        public int getShardAuditWorkerQueueSize() {
+            return 0;
         }
 
 
         @Override
-        public int getShardCacheRefreshWorkerCount() {
-            return 0;
+        public long getCounterFlushCount() {
+            return 100000l;
         }
 
 
         @Override
-        public int getShardAuditWorkerCount() {
-            return 0;
+        public long getCounterFlushInterval() {
+            return 30000l;
         }
 
 
         @Override
-        public int getShardAuditWorkerQueueSize() {
-            return 0;
+        public int getCounterFlushQueueSize() {
+            return 10000;
         }
 
 
         @Override
-        public long getCounterFlushCount() {
-            return 100000l;
+        public String getShardWriteConsistency() {
+            return null;
         }
 
 
         @Override
-        public long getCounterFlushInterval() {
-            return 30000l;
+        public String getShardReadConsistency() {
+            return null;
         }
 
 
         @Override
-        public int getCounterFlushQueueSize() {
-            return 10000;
+        public String getShardAuditConsistency() {
+            return null;
         }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9f3bf2b3/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIteratorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIteratorTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIteratorTest.java
index 0eccc3e..238e238 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIteratorTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIteratorTest.java
@@ -56,11 +56,10 @@ public class ShardEntryGroupIteratorTest {
         final ApplicationScope scope = new ApplicationScopeImpl( IdGenerator.createId( "application" ) );
         final DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromSourceNode( IdGenerator.createId( "source" ), "test" );
         final ShardGroupCompaction shardGroupCompaction = mock( ShardGroupCompaction.class );
-        final long delta = 10000;
         final Iterator<Shard> noShards = Collections.<Shard>emptyList().iterator();
 
         //should blow up, our iterator is empty
-        new ShardEntryGroupIterator( noShards, delta, shardGroupCompaction, scope, directedEdgeMeta );
+        new ShardEntryGroupIterator( noShards, shardGroupCompaction, scope, directedEdgeMeta );
     }
 
 
@@ -74,11 +73,10 @@ public class ShardEntryGroupIteratorTest {
         final ShardGroupCompaction shardGroupCompaction = mock( ShardGroupCompaction.class );
 
         final Shard minShard = new Shard( 0, 0, true );
-        final long delta = 10000;
         final Iterator<Shard> noShards = Collections.singleton( minShard ).iterator();
 
         ShardEntryGroupIterator entryGroupIterator =
-                new ShardEntryGroupIterator( noShards, delta, shardGroupCompaction, scope, directedEdgeMeta );
+                new ShardEntryGroupIterator( noShards, shardGroupCompaction, scope, directedEdgeMeta );
 
 
         assertTrue( "Root shard always present", entryGroupIterator.hasNext() );
@@ -163,7 +161,7 @@ public class ShardEntryGroupIteratorTest {
 
 
         ShardEntryGroupIterator entryGroupIterator =
-                new ShardEntryGroupIterator( noShards, delta, shardGroupCompaction, scope, directedEdgeMeta );
+                new ShardEntryGroupIterator( noShards, shardGroupCompaction, scope, directedEdgeMeta );
 
         assertTrue( "max group present", entryGroupIterator.hasNext() );
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9f3bf2b3/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ElasticSearchQueryBuilder/SearchRequestBuilderStrategyV2.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ElasticSearchQueryBuilder/SearchRequestBuilderStrategyV2.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ElasticSearchQueryBuilder/SearchRequestBuilderStrategyV2.java
index 4d18cef..0562a6c 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ElasticSearchQueryBuilder/SearchRequestBuilderStrategyV2.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ElasticSearchQueryBuilder/SearchRequestBuilderStrategyV2.java
@@ -17,7 +17,7 @@
  *  * directory of this distribution.
  *
  */
-package org.apache.usergrid.persistence.index.ElasticSearchQueryBuilder;
+package org.apache.usergrid.persistence.index.elasticsearchquerybuilder;
 
 
 import org.elasticsearch.action.search.SearchRequestBuilder;

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9f3bf2b3/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index 87e2dbd..fcfe39e 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -32,7 +32,7 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.util.Health;
 import org.apache.usergrid.persistence.core.util.ValidationUtils;
 import org.apache.usergrid.persistence.index.*;
-import org.apache.usergrid.persistence.index.ElasticSearchQueryBuilder.SearchRequestBuilderStrategyV2;
+import org.apache.usergrid.persistence.index.elasticsearchquerybuilder.SearchRequestBuilderStrategyV2;
 import org.apache.usergrid.persistence.index.exceptions.IndexException;
 import org.apache.usergrid.persistence.index.migration.IndexDataVersions;
 import org.apache.usergrid.persistence.index.query.ParsedQuery;