You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2014/08/29 19:34:01 UTC

[03/20] Implemented New rolling shard algorithm. This should allow eventual shard consistency between all clients without the need for an external locking allocation system

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72684dfc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
index cf70669..3c3b99c 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
@@ -20,27 +20,31 @@
 package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
 
 
-import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Iterator;
-import java.util.List;
 
-import org.apache.commons.collections4.iterators.PushbackIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.persistence.core.consistency.TimeService;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.util.ValidationUtils;
 import org.apache.usergrid.persistence.graph.GraphFig;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
 import org.apache.usergrid.persistence.graph.exception.GraphRuntimeException;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeColumnFamilies;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardSerialization;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardAllocation;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardApproximation;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.count.NodeShardCounterSerialization;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.count.ShardKey;
-import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardedEdgeSerialization;
+import org.apache.usergrid.persistence.graph.serialization.util.GraphValidation;
 
 import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
-import com.netflix.astyanax.Keyspace;
-import com.netflix.astyanax.MutationBatch;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
 
@@ -50,8 +54,13 @@ import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 public class NodeShardAllocationImpl implements NodeShardAllocation {
 
 
+    private static final Logger LOG = LoggerFactory.getLogger( NodeShardAllocationImpl.class );
+
+    private static final Shard MIN_SHARD = new Shard(0, 0, true);
+
     private final EdgeShardSerialization edgeShardSerialization;
-//    private final NodeShardCounterSerialization edgeShardCounterSerialization;
+    private final EdgeColumnFamilies edgeColumnFamilies;
+    private final ShardedEdgeSerialization shardedEdgeSerialization;
     private final NodeShardApproximation nodeShardApproximation;
     private final TimeService timeService;
     private final GraphFig graphFig;
@@ -59,9 +68,13 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
 
     @Inject
     public NodeShardAllocationImpl( final EdgeShardSerialization edgeShardSerialization,
-                                    final NodeShardApproximation nodeShardApproximation,
-                                    final TimeService timeService, final GraphFig graphFig ) {
+                                    final EdgeColumnFamilies edgeColumnFamilies,
+                                    final ShardedEdgeSerialization shardedEdgeSerialization,
+                                    final NodeShardApproximation nodeShardApproximation, final TimeService timeService,
+                                    final GraphFig graphFig ) {
         this.edgeShardSerialization = edgeShardSerialization;
+        this.edgeColumnFamilies = edgeColumnFamilies;
+        this.shardedEdgeSerialization = shardedEdgeSerialization;
         this.nodeShardApproximation = nodeShardApproximation;
         this.timeService = timeService;
         this.graphFig = graphFig;
@@ -69,109 +82,115 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
 
 
     @Override
-    public Iterator<Long> getShards( final ApplicationScope scope, final Id nodeId, Optional<Long> maxShardId, final String... edgeTypes ) {
-
-        final Iterator<Long> existingShards =
-                edgeShardSerialization.getEdgeMetaData( scope, nodeId, maxShardId, edgeTypes );
-
-        final PushbackIterator<Long> pushbackIterator = new PushbackIterator( existingShards );
-//
-//
-//        final long now = timeService.getCurrentTime();
-//
-//
-//        final List<Long> futures = new ArrayList<Long>();
-//
-//
-//        //loop through all shards, any shard > now+1 should be deleted
-//        while ( pushbackIterator.hasNext() ) {
-//
-//            final Long value = pushbackIterator.next();
-//
-//            //we're done, our current time uuid is greater than the value stored
-//            if ( now >= value ) {
-//                //push it back into the iterator
-//                pushbackIterator.pushback( value );
-//                break;
-//            }
-//
-//            futures.add( value );
-//        }
-//
-//
-//        //we have more than 1 future value, we need to remove it
-//
-//        MutationBatch cleanup = keyspace.prepareMutationBatch();
-//
-//        //remove all futures except the last one, it is the only value we shouldn't lazy remove
-//        for ( int i = 0; i < futures.size() -1; i++ ) {
-//            final long toRemove = futures.get( i );
-//
-//            final MutationBatch batch = edgeShardSerialization.removeEdgeMeta( scope, nodeId, toRemove, edgeTypes );
-//
-//            cleanup.mergeShallow( batch );
-//        }
-//
-//
-//        try {
-//            cleanup.execute();
-//        }
-//        catch ( ConnectionException e ) {
-//            throw new GraphRuntimeException( "Unable to remove future shards, mutation error", e );
-//        }
-//
-//
-//        final int futuresSize =  futures.size();
-//
-//        if ( futuresSize > 0 ) {
-//            pushbackIterator.pushback( futures.get( futuresSize - 1 ) );
-//        }
-//
-//
-        /**
-         * Nothing to iterate, return an iterator with 0.
-         */
-        if(!pushbackIterator.hasNext()){
-            pushbackIterator.pushback( 0l );
+    public Iterator<ShardEntryGroup> getShards( final ApplicationScope scope,
+                                                final Optional<Shard> maxShardId, final DirectedEdgeMeta directedEdgeMeta ) {
+
+        ValidationUtils.validateApplicationScope(scope);
+        Preconditions.checkNotNull( maxShardId, "maxShardId cannot be null" );
+        GraphValidation.validateDirectedEdgeMeta( directedEdgeMeta );
+
+        Iterator<Shard> existingShards;
+
+        //its a new node, it doesn't need to check cassandra, it won't exist
+        if(isNewNode(directedEdgeMeta)){
+            existingShards = Collections.singleton( MIN_SHARD ).iterator();
+        }
+
+        else{
+            existingShards = edgeShardSerialization.getShardMetaData( scope, maxShardId, directedEdgeMeta );
+        }
+
+        if(existingShards == null || !existingShards.hasNext()){
+
+            try {
+                edgeShardSerialization.writeShardMeta( scope, MIN_SHARD, directedEdgeMeta ).execute();
+            }
+            catch ( ConnectionException e ) {
+                throw new GraphRuntimeException( "Unable to allocate minimum shard" );
+            }
+
+            existingShards = Collections.singleton( MIN_SHARD ).iterator();
         }
 
-        return pushbackIterator;
+        return new ShardEntryGroupIterator( existingShards, graphFig.getShardMinDelta() );
     }
 
 
     @Override
-    public boolean auditMaxShard( final ApplicationScope scope, final Id nodeId, final String... edgeType ) {
+    public boolean auditShard( final ApplicationScope scope, final ShardEntryGroup shardEntryGroup, final DirectedEdgeMeta directedEdgeMeta) {
 
-        final Iterator<Long> maxShards = getShards( scope, nodeId, Optional.<Long>absent(), edgeType );
+        ValidationUtils.validateApplicationScope(scope);
+        GraphValidation.validateShardEntryGroup( shardEntryGroup );
+        GraphValidation.validateDirectedEdgeMeta( directedEdgeMeta );
 
+        Preconditions.checkNotNull( shardEntryGroup, "shardEntryGroup cannot be null" );
 
-        //if the first shard has already been allocated, do nothing.
 
-        //now is already > than the max, don't do anything
-        if ( !maxShards.hasNext() ) {
+        /**
+         * Nothing to do, it's been created very recently, we don't create a new one
+         */
+        if (shardEntryGroup.isCompactionPending()) {
+            return false;
+        }
+
+        //we can't allocate, we have more than 1 write shard currently.  We need to compact first
+        if(shardEntryGroup.entrySize() != 1){
+            return false;
+        }
+
+
+        /**
+         * Check the min shard in our system
+         */
+        final Shard shard = shardEntryGroup.getMinShard();
+
+
+        if (shard.getCreatedTime() >= getMinTime()){
             return false;
         }
 
-        final long maxShard = maxShards.next();
 
         /**
          * Check out if we have a count for our shard allocation
          */
 
+        final long count =
+                nodeShardApproximation.getCount( scope, shard, directedEdgeMeta);
 
-        final long count = nodeShardApproximation.getCount( scope, nodeId, maxShard, edgeType );
 
         if ( count < graphFig.getShardSize() ) {
             return false;
         }
 
-        //try to get a lock here, and fail if one isn't present
 
-        final long newShardTime = timeService.getCurrentTime() + graphFig.getShardCacheTimeout() * 2;
+
+
+        /**
+         * Allocate the shard
+         */
+
+        final Iterator<MarkedEdge> edges  = directedEdgeMeta.loadEdges( shardedEdgeSerialization, edgeColumnFamilies, scope, shardEntryGroup, Long.MAX_VALUE );
+
+
+        if ( !edges.hasNext() ) {
+            LOG.warn( "Tried to allocate a new shard for edge meta data {}, "
+                    + "but no max value could be found in that row", directedEdgeMeta );
+            return false;
+        }
+
+        //we have a next, allocate it based on the max
+
+        MarkedEdge marked = edges.next();
+
+        final long createTimestamp = timeService.getCurrentTime();
+
+        final Shard newShard = new Shard(marked.getTimestamp(), createTimestamp, false);
 
 
         try {
-            this.edgeShardSerialization.writeEdgeMeta( scope, nodeId, newShardTime, edgeType ).execute();
+            this.edgeShardSerialization
+                    .writeShardMeta( scope, newShard, directedEdgeMeta )
+                    .execute();
         }
         catch ( ConnectionException e ) {
             throw new GraphRuntimeException( "Unable to write the new edge metadata" );
@@ -181,4 +200,50 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
         return true;
     }
 
+
+    @Override
+    public long getMinTime() {
+
+        final long minimumAllowed = 2 * graphFig.getShardCacheTimeout();
+
+        final long minDelta = graphFig.getShardMinDelta();
+
+
+        if ( minDelta < minimumAllowed ) {
+            throw new GraphRuntimeException( String.format(
+                    "You must configure the property %s to be >= 2 x %s.  Otherwise you risk losing data",
+                    GraphFig.SHARD_MIN_DELTA, GraphFig.SHARD_CACHE_TIMEOUT ) );
+        }
+
+        return timeService.getCurrentTime() - minDelta;
+    }
+
+
+    /**
+     * Return true if the node has been created within our timeout.  If this is the case, we dont' need to check
+     * cassandra, we know it won't exist
+     *
+     * @param directedEdgeMeta
+     * @return
+     */
+    private boolean isNewNode(DirectedEdgeMeta directedEdgeMeta){
+
+        /**
+         * the max time in microseconds we can allow
+         */
+        final long maxTime = (timeService.getCurrentTime() + graphFig.getShardCacheTimeout() )* 10000;
+
+        for(DirectedEdgeMeta.NodeMeta node: directedEdgeMeta.getNodes()){
+            final long uuidTime = node.getId().getUuid().timestamp();
+
+            if(uuidTime < maxTime){
+                return true;
+            }
+        }
+
+        return false;
+
+    }
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72684dfc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
index 3b78898..d3dd86e 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
@@ -21,55 +21,77 @@ package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
 
 import java.beans.PropertyChangeEvent;
 import java.beans.PropertyChangeListener;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
-import java.util.TreeSet;
-import java.util.UUID;
+import java.util.Map;
+import java.util.TreeMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.core.consistency.TimeService;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.util.ValidationUtils;
 import org.apache.usergrid.persistence.graph.GraphFig;
 import org.apache.usergrid.persistence.graph.exception.GraphRuntimeException;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardAllocation;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardCache;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
+import org.apache.usergrid.persistence.graph.serialization.util.GraphValidation;
 import org.apache.usergrid.persistence.graph.serialization.util.IterableUtil;
-import org.apache.usergrid.persistence.model.entity.Id;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import com.google.common.cache.Weigher;
 import com.google.inject.Inject;
 
 
-
 /**
  * Simple implementation of the shard.  Uses a local Guava shard with a timeout.  If a value is not present in the
  * shard, it will need to be searched via cassandra.
  */
 public class NodeShardCacheImpl implements NodeShardCache {
 
+    private static final Logger LOG = LoggerFactory.getLogger( NodeShardCacheImpl.class );
+
+    /**
+     * Only cache shards that have < 10k groups.  This is an arbitrary amount, and may change with profiling and
+     * testing
+     */
+    private static final int MAX_WEIGHT_PER_ELEMENT = 10000;
+
     private final NodeShardAllocation nodeShardAllocation;
     private final GraphFig graphFig;
+    private final TimeService timeservice;
 
     private LoadingCache<CacheKey, CacheEntry> graphs;
 
 
     /**
-     *
-     * @param nodeShardAllocation
+     *  @param nodeShardAllocation
      * @param graphFig
+     * @param timeservice
      */
     @Inject
-    public NodeShardCacheImpl( final NodeShardAllocation nodeShardAllocation, final GraphFig graphFig ) {
-        Preconditions.checkNotNull(nodeShardAllocation, "nodeShardAllocation is required");
-        Preconditions.checkNotNull(graphFig, "consistencyFig is required");
+    public NodeShardCacheImpl( final NodeShardAllocation nodeShardAllocation, final GraphFig graphFig,
+                               final TimeService timeservice ) {
+
+        Preconditions.checkNotNull( nodeShardAllocation, "nodeShardAllocation is required" );
+        Preconditions.checkNotNull( graphFig, "consistencyFig is required" );
+        Preconditions.checkNotNull( timeservice, "timeservice is required" );
 
         this.nodeShardAllocation = nodeShardAllocation;
         this.graphFig = graphFig;
+        this.timeservice = timeservice;
 
         /**
          * Add our listener to reconstruct the shard
@@ -79,24 +101,29 @@ public class NodeShardCacheImpl implements NodeShardCache {
             public void propertyChange( final PropertyChangeEvent evt ) {
                 final String propertyName = evt.getPropertyName();
 
-                if ( propertyName.equals( GraphFig.SHARD_CACHE_SIZE ) || propertyName.equals( GraphFig.SHARD_CACHE_TIMEOUT ) ) {
+                if ( propertyName.equals( GraphFig.SHARD_CACHE_SIZE ) || propertyName
+                        .equals( GraphFig.SHARD_CACHE_TIMEOUT ) ) {
+
                     updateCache();
                 }
             }
         } );
 
         /**
-         * Initialize the shard
+         * Initialize the shard cache
          */
         updateCache();
     }
 
 
     @Override
-    public long getSlice( final ApplicationScope scope, final Id nodeId, final long timestamp, final String... edgeType ) {
+    public ShardEntryGroup getWriteShardGroup( final ApplicationScope scope, final long timestamp,
+                                               final DirectedEdgeMeta directedEdgeMeta ) {
 
+        ValidationUtils.validateApplicationScope(scope);
+        GraphValidation.validateDirectedEdgeMeta( directedEdgeMeta );
 
-        final CacheKey key = new CacheKey( scope, nodeId, edgeType );
+        final CacheKey key = new CacheKey( scope, directedEdgeMeta );
         CacheEntry entry;
 
         try {
@@ -106,7 +133,7 @@ public class NodeShardCacheImpl implements NodeShardCache {
             throw new GraphRuntimeException( "Unable to load shard key for graph", e );
         }
 
-        final Long shardId = entry.getShardId( timestamp );
+        final ShardEntryGroup shardId = entry.getShardId( timestamp );
 
         if ( shardId != null ) {
             return shardId;
@@ -118,55 +145,41 @@ public class NodeShardCacheImpl implements NodeShardCache {
 
 
     @Override
-    public Iterator<Long> getVersions( final ApplicationScope scope, final Id nodeId, final long maxTimestamp,
-                                       final String... edgeType ) {
-        final CacheKey key = new CacheKey( scope, nodeId, edgeType );
-              CacheEntry entry;
-
-              try {
-                  entry = this.graphs.get( key );
-              }
-              catch ( ExecutionException e ) {
-                  throw new GraphRuntimeException( "Unable to load shard key for graph", e );
-              }
-
-        Iterator<Long> iterator = entry.getShards( maxTimestamp );
-
-        if(iterator == null){
-            return Collections.<Long>emptyList().iterator();
+    public Iterator<ShardEntryGroup> getReadShardGroup( final ApplicationScope scope, final long maxTimestamp,
+                                                        final DirectedEdgeMeta directedEdgeMeta ) {
+        ValidationUtils.validateApplicationScope(scope);
+        GraphValidation.validateDirectedEdgeMeta( directedEdgeMeta );
+
+        final CacheKey key = new CacheKey( scope, directedEdgeMeta );
+        CacheEntry entry;
+
+        try {
+            entry = this.graphs.get( key );
+        }
+        catch ( ExecutionException e ) {
+            throw new GraphRuntimeException( "Unable to load shard key for graph", e );
+        }
+
+        Iterator<ShardEntryGroup> iterator = entry.getShards( maxTimestamp );
+
+        if ( iterator == null ) {
+            return Collections.<ShardEntryGroup>emptyList().iterator();
         }
 
         return iterator;
     }
 
+
     /**
      * This is a race condition.  We could re-init the shard while another thread is reading it.  This is fine, the read
      * doesn't have to be precise.  The algorithm accounts for stale data.
      */
     private void updateCache() {
 
-        this.graphs = CacheBuilder.newBuilder().maximumSize( graphFig.getShardCacheSize() )
-                  .expireAfterWrite( graphFig.getShardCacheSize(), TimeUnit.MILLISECONDS )
-                  .build( new CacheLoader<CacheKey, CacheEntry>() {
-
-
-                      @Override
-                      public CacheEntry load( final CacheKey key ) throws Exception {
-
-//
-//                          /**
-//                           * Perform an audit in case we need to allocate a new shard
-//                           */
-//                          nodeShardAllocation.auditMaxShard( key.scope, key.id, key.types );
-//                          //TODO, we need to put some sort of upper bounds on this, it could possibly get too large
-
-
-                          final Iterator<Long> edges = nodeShardAllocation
-                                  .getShards( key.scope, key.id, Optional.<Long>absent(), key.types );
-
-                          return new CacheEntry( edges );
-                      }
-                  } );
+        this.graphs = CacheBuilder.newBuilder().expireAfterWrite( graphFig.getShardCacheSize(), TimeUnit.MILLISECONDS )
+                                  .removalListener( new ShardRemovalListener() )
+                                  .maximumWeight( MAX_WEIGHT_PER_ELEMENT * graphFig.getShardCacheSize() )
+                                  .weigher( new ShardWeigher() ).build( new ShardCacheLoader() );
     }
 
 
@@ -175,14 +188,12 @@ public class NodeShardCacheImpl implements NodeShardCache {
      */
     private static class CacheKey {
         private final ApplicationScope scope;
-        private final Id id;
-        private final String[] types;
+        private final DirectedEdgeMeta directedEdgeMeta;
 
 
-        private CacheKey( final ApplicationScope scope, final Id id, final String[] types ) {
+        private CacheKey( final ApplicationScope scope, final DirectedEdgeMeta directedEdgeMeta ) {
             this.scope = scope;
-            this.id = id;
-            this.types = types;
+            this.directedEdgeMeta = directedEdgeMeta;
         }
 
 
@@ -197,21 +208,23 @@ public class NodeShardCacheImpl implements NodeShardCache {
 
             final CacheKey cacheKey = ( CacheKey ) o;
 
-            if ( !id.equals( cacheKey.id ) ) {
+            if ( !scope.equals( cacheKey.scope ) ) {
                 return false;
             }
-            if ( !Arrays.equals( types, cacheKey.types ) ) {
+
+            if ( !directedEdgeMeta.equals( cacheKey.directedEdgeMeta ) ) {
                 return false;
             }
 
+
             return true;
         }
 
 
         @Override
         public int hashCode() {
-            int result = id.hashCode();
-            result = 31 * result + Arrays.hashCode( types );
+            int result = scope.hashCode();
+            result = 31 * result + directedEdgeMeta.hashCode();
             return result;
         }
     }
@@ -220,38 +233,140 @@ public class NodeShardCacheImpl implements NodeShardCache {
     /**
      * An entry for the shard.
      */
-    private static class CacheEntry {
+    public static final class CacheEntry {
         /**
          * Get the list of all segments
          */
-        private TreeSet<Long> shards;
+        private TreeMap<Long, ShardEntryGroup> shards;
 
 
-        private CacheEntry( final Iterator<Long> shards ) {
-            this.shards = new TreeSet<>( );
+        private CacheEntry( final Iterator<ShardEntryGroup> shards ) {
+            Preconditions.checkArgument( shards.hasNext(),
+                    "More than 1 entry must be present in the shard to load into cache" );
 
-            for ( Long shard : IterableUtil.wrap( shards ) ) {
-                this.shards.add( shard );
+            this.shards = new TreeMap<>();
+            /**
+             * TODO, we need to bound this.  While I don't envision more than a thousand groups max,
+             * we don't want 1 entry to use all our ram
+             */
+            for ( ShardEntryGroup shard : IterableUtil.wrap( shards ) ) {
+                this.shards.put( shard.getMinShard().getShardIndex(), shard );
             }
         }
 
 
         /**
-         * Get the shard's UUID for the uuid we're attempting to seek from
+         * Return the size of the elements in the cache
          */
-        public Long getShardId( final Long seek ) {
-            return this.shards.floor( seek );
+        public int getCacheSize() {
+            return this.shards.size();
         }
 
 
         /**
          * Get all shards <= this one in decending order
-         * @return
          */
-        public Iterator<Long> getShards( final Long maxShard ){
-            return this.shards.headSet(maxShard, true  ).descendingIterator();
+        public Iterator<ShardEntryGroup> getShards( final Long maxShard ) {
+
+            final Long firstKey = shards.floorKey( maxShard );
+
+            return shards.headMap( firstKey, true ).descendingMap().values().iterator();
+        }
+
+
+        /**
+         * Get the shard entry that should hold this value
+         */
+        public ShardEntryGroup getShardId( final Long seek ) {
+            Map.Entry<Long, ShardEntryGroup> entry = shards.floorEntry( seek );
+
+            if ( entry == null ) {
+                throw new NullPointerException( "Entry should never be null, this is a bug" );
+            }
+
+            return entry.getValue();
         }
     }
 
 
+    /**
+     * Load the cache entries from the shards we have stored
+     */
+    final class ShardCacheLoader extends CacheLoader<CacheKey, CacheEntry> {
+
+
+        @Override
+        public CacheEntry load( final CacheKey key ) throws Exception {
+
+
+            final Iterator<ShardEntryGroup> edges =
+                    nodeShardAllocation.getShards( key.scope, Optional.<Shard>absent(), key.directedEdgeMeta );
+
+            return new CacheEntry( edges );
+        }
+    }
+
+
+    /**
+     * Calculates the weight of the entry by geting the size of the cache
+     */
+    final class ShardWeigher implements Weigher<CacheKey, CacheEntry> {
+
+        @Override
+        public int weigh( final CacheKey key, final CacheEntry value ) {
+            return value.getCacheSize();
+        }
+    }
+
+
+    /**
+     * On removal from the cache, we want to audit the maximum shard.  If it needs to allocate a new shard, we want to
+     * do so. IF there's a compaction pending, we want to run the compaction task
+     */
+    final class ShardRemovalListener implements RemovalListener<CacheKey, CacheEntry> {
+
+        @Override
+        public void onRemoval( final RemovalNotification<CacheKey, CacheEntry> notification ) {
+
+
+            final CacheKey key = notification.getKey();
+            final CacheEntry entry = notification.getValue();
+
+
+            Iterator<ShardEntryGroup> groups = entry.getShards( Long.MAX_VALUE );
+
+
+            /**
+             * Start at our max, then
+             */
+
+            //audit all our groups
+            while ( groups.hasNext() ) {
+                ShardEntryGroup group = groups.next();
+
+                /**
+                 * We don't have a compaction pending.  Run an audit on the shards
+                 */
+                if ( !group.isCompactionPending() ) {
+                    LOG.debug( "No compaction pending, checking max shard on expiration" );
+                    /**
+                     * Check if we should allocate, we may want to
+                     */
+
+
+                    nodeShardAllocation.auditShard( key.scope, group, key.directedEdgeMeta );
+                    continue;
+                }
+                /**
+                 * Do the compaction
+                 */
+                if ( group.shouldCompact( timeservice.getCurrentTime() ) ) {
+                    //launch the compaction
+                }
+
+                //no op, there's nothing we need to do to this shard
+
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72684dfc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/RowSerializer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/RowSerializer.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/RowSerializer.java
new file mode 100644
index 0000000..1edaf21
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/RowSerializer.java
@@ -0,0 +1,63 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
+
+
+import org.apache.usergrid.persistence.core.astyanax.CompositeFieldSerializer;
+import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.RowKey;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.netflix.astyanax.model.CompositeBuilder;
+import com.netflix.astyanax.model.CompositeParser;
+
+
+/**
+ * Class to perform serialization for row keys from edges
+ */
+public class RowSerializer implements CompositeFieldSerializer<RowKey> {
+
+    private static final IdRowCompositeSerializer ID_SER = IdRowCompositeSerializer.get();
+
+
+    @Override
+    public void toComposite( final CompositeBuilder builder, final RowKey key ) {
+
+        //add the row id to the composite
+        ID_SER.toComposite( builder, key.nodeId );
+
+        builder.addString( key.edgeType );
+        builder.addLong( key.shardId );
+    }
+
+
+    @Override
+    public RowKey fromComposite( final CompositeParser composite ) {
+
+        final Id id = ID_SER.fromComposite( composite );
+        final String edgeType = composite.readString();
+        final long shard = composite.readLong();
+
+
+        return new RowKey( id, edgeType, shard );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72684dfc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/RowTypeSerializer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/RowTypeSerializer.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/RowTypeSerializer.java
new file mode 100644
index 0000000..2edea56
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/RowTypeSerializer.java
@@ -0,0 +1,62 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
+
+
+import org.apache.usergrid.persistence.core.astyanax.CompositeFieldSerializer;
+import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.RowKeyType;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.netflix.astyanax.model.CompositeBuilder;
+import com.netflix.astyanax.model.CompositeParser;
+
+
+public class RowTypeSerializer implements CompositeFieldSerializer<RowKeyType> {
+
+    private static final IdRowCompositeSerializer ID_SER = IdRowCompositeSerializer.get();
+
+
+    @Override
+    public void toComposite( final CompositeBuilder builder, final RowKeyType keyType ) {
+
+        //add the row id to the composite
+        ID_SER.toComposite( builder, keyType.nodeId );
+
+        builder.addString( keyType.edgeType );
+        builder.addString( keyType.idType );
+
+        builder.addLong( keyType.shardId );
+    }
+
+
+    @Override
+    public RowKeyType fromComposite( final CompositeParser composite ) {
+
+        final Id id = ID_SER.fromComposite( composite );
+        final String edgeType = composite.readString();
+        final String idType = composite.readString();
+        final long shard = composite.readLong();
+
+        return new RowKeyType( id, edgeType, idType, shard);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72684dfc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java
new file mode 100644
index 0000000..c8a884b
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java
@@ -0,0 +1,100 @@
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
+
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import org.apache.commons.collections4.iterators.PushbackIterator;
+
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
+
+import com.google.common.base.Preconditions;
+
+
+/**
+ * Utility class that will take an iterator of all shards, and combine them into an iterator
+ * of ShardEntryGroups.  These groups can then be used in a distributed system to handle concurrent reads and writes
+ */
+public class ShardEntryGroupIterator implements Iterator<ShardEntryGroup> {
+
+
+    private ShardEntryGroup next;
+    private final PushbackIterator<Shard> sourceIterator;
+    private final long minDelta;
+
+
+    /**
+     * Create a shard iterator
+     * @param shardIterator The iterator of all shards.  Order is expected to be by the  shard index from Long.MAX to Long.MIN
+     * @param minDelta The minimum delta we allow to consider shards the same group
+     */
+    public ShardEntryGroupIterator( final Iterator<Shard> shardIterator, final long minDelta ) {
+        Preconditions.checkArgument(shardIterator.hasNext(), "Shard iterator must have shards present");
+        this.sourceIterator = new PushbackIterator( shardIterator );
+        this.minDelta = minDelta;
+    }
+
+
+    @Override
+    public boolean hasNext() {
+        if ( next == null ) {
+            advance();
+        }
+
+        return next != null;
+    }
+
+
+    @Override
+    public ShardEntryGroup next() {
+        if ( !hasNext() ) {
+            throw new NoSuchElementException( "No more elements exist in iterator" );
+        }
+
+
+        final ShardEntryGroup toReturn = next;
+
+        next = null;
+
+        return toReturn;
+    }
+
+
+    @Override
+    public void remove() {
+        throw new UnsupportedOperationException( "Remove is not supported" );
+    }
+
+
+    /**
+     * Advance to the next element
+     */
+    private void advance() {
+
+        /**
+         * We loop through until we've exhausted our source, or we have 2 elements, which means
+         * they're > min time allocation from one another
+         */
+        while ( sourceIterator.hasNext() ) {
+
+            if(next == null){
+                next = new ShardEntryGroup( minDelta );
+            }
+
+            final Shard shard = sourceIterator.next();
+
+
+            //we can't add this one to the entries, it doesn't fit within the delta, allocate a new one and break
+            if ( next.addShard( shard ) ) {
+                continue;
+            }
+
+
+            sourceIterator.pushback( shard );
+            break;
+        }
+
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72684dfc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
new file mode 100644
index 0000000..48336cd
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
@@ -0,0 +1,75 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
+
+
+import java.util.Collection;
+
+import org.apache.usergrid.persistence.core.consistency.TimeService;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardGroupCompaction;
+
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+import rx.Observable;
+
+
+/**
+ * Implementation of the shard group compaction
+ */
+@Singleton
+public class ShardGroupCompactionImpl implements ShardGroupCompaction {
+
+
+    private final TimeService timeService;
+
+
+    @Inject
+    public ShardGroupCompactionImpl( final TimeService timeService ) {this.timeService = timeService;}
+
+
+    @Override
+    public Observable<Integer> compact( final ShardEntryGroup group ) {
+        final long startTime = timeService.getCurrentTime();
+
+        Preconditions.checkNotNull(group, "group cannot be null");
+        Preconditions.checkArgument( group.isCompactionPending(), "Compaction is pending" );
+        Preconditions.checkArgument( group.shouldCompact(startTime  ), "Compaction can now be run" );
+
+
+        final Shard targetShard = group.getCompactionTarget();
+
+        final Collection<Shard> sourceShards = group.getReadShards();
+
+
+        //now get iterators for each of the source shards, and then copy them to the compaction target shard
+
+
+
+
+        return null;
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72684dfc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardRowIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardRowIterator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardRowIterator.java
new file mode 100644
index 0000000..0bbb011
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardRowIterator.java
@@ -0,0 +1,134 @@
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
+
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.usergrid.persistence.core.astyanax.ColumnNameIterator;
+import org.apache.usergrid.persistence.core.astyanax.MultiKeyColumnNameIterator;
+import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
+import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.model.ConsistencyLevel;
+import com.netflix.astyanax.query.RowQuery;
+import com.netflix.astyanax.util.RangeBuilder;
+
+
+/**
+ * Internal iterator to iterate over multiple row keys
+ *
+ * @param <R> The row type
+ * @param <C> The column type
+ * @param <T> The parsed return type
+ */
+public class ShardRowIterator<R, C, T> implements Iterator<T> {
+
+    private final EdgeSearcher<R, C, T> searcher;
+
+    private final MultiTennantColumnFamily<ApplicationScope, R, C> cf;
+
+    private Iterator<T> currentColumnIterator;
+
+    private final Keyspace keyspace;
+
+    private final int pageSize;
+
+    private final ConsistencyLevel consistencyLevel;
+
+
+    public ShardRowIterator( final EdgeSearcher<R, C, T> searcher,
+                             final MultiTennantColumnFamily<ApplicationScope, R, C> cf, final Keyspace keyspace,
+                             final ConsistencyLevel consistencyLevel, final int pageSize ) {
+        this.searcher = searcher;
+        this.cf = cf;
+        this.keyspace = keyspace;
+        this.pageSize = pageSize;
+        this.consistencyLevel = consistencyLevel;
+    }
+
+
+    @Override
+    public boolean hasNext() {
+        //we have more columns to return
+        if ( currentColumnIterator != null && currentColumnIterator.hasNext() ) {
+            return true;
+        }
+
+        /**
+         * We have another row key, advance to it and re-check
+         */
+        if ( searcher.hasNext() ) {
+            advanceRow();
+            return hasNext();
+        }
+
+        //we have no more columns, and no more row keys, we're done
+        return false;
+    }
+
+
+    @Override
+    public T next() {
+        if ( !hasNext() ) {
+            throw new NoSuchElementException( "There are no more rows or columns left to advance" );
+        }
+
+        return currentColumnIterator.next();
+    }
+
+
+    @Override
+    public void remove() {
+        throw new UnsupportedOperationException( "Remove is unsupported" );
+    }
+
+
+    /**
+     * Advance our iterator to the next row (assumes the check for row keys is elsewhere)
+     */
+    private void advanceRow() {
+
+        /**
+         * If the edge is present, we need to being seeking from this
+         */
+
+        final RangeBuilder rangeBuilder = new RangeBuilder().setLimit( pageSize );
+
+
+        //set the range into the search
+        searcher.setRange( rangeBuilder );
+
+        /**
+         * Get our list of slices
+         */
+        final List<ScopedRowKey<ApplicationScope, R>> rowKeys = searcher.next();
+
+
+        final List<ColumnNameIterator<C, T>> columnNameIterators = new ArrayList<>( rowKeys.size() );
+
+        for(ScopedRowKey<ApplicationScope, R> rowKey: rowKeys){
+
+
+
+           final  RowQuery<ScopedRowKey<ApplicationScope, R>, C> query =
+                    keyspace.prepareQuery( cf ).setConsistencyLevel( consistencyLevel ).getKey( rowKey )
+                            .autoPaginate( true ).withColumnRange( rangeBuilder.build() );
+
+
+            final ColumnNameIterator<C, T> columnNameIterator = new ColumnNameIterator<>( query, searcher, searcher.hasPage() );
+
+            columnNameIterators.add( columnNameIterator );
+
+        }
+
+
+
+        currentColumnIterator = new MultiKeyColumnNameIterator<>(columnNameIterators, searcher, pageSize);
+
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72684dfc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
new file mode 100644
index 0000000..6f7c5ce
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
@@ -0,0 +1,654 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
+
+
+import java.util.Iterator;
+import java.util.UUID;
+
+import javax.inject.Inject;
+
+import org.apache.usergrid.persistence.core.astyanax.CassandraConfig;
+import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
+import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
+import org.apache.usergrid.persistence.core.consistency.TimeService;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.util.ValidationUtils;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphFig;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.SearchByEdge;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
+import org.apache.usergrid.persistence.graph.SearchByIdType;
+import org.apache.usergrid.persistence.graph.impl.SimpleMarkedEdge;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdge;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeColumnFamilies;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeRowKey;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardStrategy;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.RowKey;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.RowKeyType;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardedEdgeSerialization;
+import org.apache.usergrid.persistence.graph.serialization.util.GraphValidation;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Optional;
+import com.google.inject.Singleton;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.Serializer;
+import com.netflix.astyanax.util.RangeBuilder;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+
+@Singleton
+public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
+
+    protected final Keyspace keyspace;
+    protected final CassandraConfig cassandraConfig;
+    protected final GraphFig graphFig;
+    protected final EdgeShardStrategy writeEdgeShardStrategy;
+    protected final TimeService timeService;
+
+
+    @Inject
+    public ShardedEdgeSerializationImpl( final Keyspace keyspace, final CassandraConfig cassandraConfig,
+                                         final GraphFig graphFig, final EdgeShardStrategy writeEdgeShardStrategy,
+                                         final TimeService timeService ) {
+
+
+        checkNotNull( "keyspace required", keyspace );
+        checkNotNull( "cassandraConfig required", cassandraConfig );
+        checkNotNull( "consistencyFig required", graphFig );
+        checkNotNull( "writeEdgeShardStrategy required", writeEdgeShardStrategy );
+        checkNotNull( "timeService required", timeService );
+
+
+        this.keyspace = keyspace;
+        this.cassandraConfig = cassandraConfig;
+        this.graphFig = graphFig;
+        this.writeEdgeShardStrategy = writeEdgeShardStrategy;
+        this.timeService = timeService;
+    }
+
+
+    @Override
+    public MutationBatch writeEdge( final EdgeColumnFamilies columnFamilies, final ApplicationScope scope,
+                                    final MarkedEdge markedEdge, final UUID timestamp  ) {
+        ValidationUtils.validateApplicationScope( scope );
+        GraphValidation.validateEdge( markedEdge );
+        ValidationUtils.verifyTimeUuid( timestamp, "timestamp" );
+
+
+        final MutationBatch batch = keyspace.prepareMutationBatch().withConsistencyLevel( cassandraConfig.getWriteCL() )
+                                            .withTimestamp( timestamp.timestamp() );
+
+        final boolean isDeleted = markedEdge.isDeleted();
+
+
+        doWrite( columnFamilies, scope, markedEdge, new RowOp<RowKey>() {
+            @Override
+            public void writeEdge( final MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> columnFamily,
+                                   final RowKey rowKey, final DirectedEdge edge ) {
+                batch.withRow( columnFamily, ScopedRowKey.fromKey( scope, rowKey ) ).putColumn( edge, isDeleted );
+            }
+
+
+            @Override
+            public void countEdge( final Shard shard, final DirectedEdgeMeta directedEdgeMeta ) {
+               if(!isDeleted) {
+                   writeEdgeShardStrategy.increment( scope, shard, 1,  directedEdgeMeta );
+               }
+            }
+
+
+
+            @Override
+            public void writeVersion( final MultiTennantColumnFamily<ApplicationScope, EdgeRowKey, Long> columnFamily,
+                                      final EdgeRowKey rowKey, final long timestamp ) {
+                batch.withRow( columnFamily, ScopedRowKey.fromKey( scope, rowKey ) ).putColumn( timestamp, isDeleted );
+            }
+        } );
+
+
+        return batch;
+    }
+
+
+    @Override
+    public MutationBatch deleteEdge( final EdgeColumnFamilies columnFamilies, final ApplicationScope scope,
+                                     final MarkedEdge markedEdge, final UUID timestamp ) {
+        ValidationUtils.validateApplicationScope( scope );
+        GraphValidation.validateEdge( markedEdge );
+        ValidationUtils.verifyTimeUuid( timestamp, "timestamp" );
+
+
+        final MutationBatch batch = keyspace.prepareMutationBatch().withConsistencyLevel( cassandraConfig.getWriteCL() )
+                                            .withTimestamp( timestamp.timestamp() );
+
+
+        doWrite( columnFamilies, scope, markedEdge, new RowOp<RowKey>() {
+            @Override
+            public void writeEdge( final MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> columnFamily,
+                                   final RowKey rowKey, final DirectedEdge edge ) {
+                batch.withRow( columnFamily, ScopedRowKey.fromKey( scope, rowKey ) ).deleteColumn( edge );
+            }
+
+
+            @Override
+            public void countEdge(  final Shard shard, final DirectedEdgeMeta directedEdgeMeta ) {
+                writeEdgeShardStrategy.increment( scope,  shard, -1, directedEdgeMeta );
+            }
+
+
+            @Override
+            public void writeVersion( final MultiTennantColumnFamily<ApplicationScope, EdgeRowKey, Long> columnFamily,
+                                      final EdgeRowKey rowKey, final long timestamp ) {
+                batch.withRow( columnFamily, ScopedRowKey.fromKey( scope, rowKey ) ).deleteColumn( timestamp );
+            }
+        } );
+
+
+        return batch;
+    }
+
+
+    /**
+     * EdgeWrite the edges internally
+     *
+     * @param scope The scope to encapsulate
+     * @param edge The edge to write
+     * @param op The row operation to invoke
+     */
+    private void doWrite( final EdgeColumnFamilies columnFamilies, final ApplicationScope scope, final MarkedEdge edge,
+                          final RowOp op ) {
+        ValidationUtils.validateApplicationScope( scope );
+        GraphValidation.validateEdge( edge );
+
+        final Id sourceNodeId = edge.getSourceNode();
+        final String sourceNodeType = sourceNodeId.getType();
+        final Id targetNodeId = edge.getTargetNode();
+        final String targetNodeType = targetNodeId.getType();
+        final long timestamp = edge.getTimestamp();
+        final String type = edge.getType();
+
+
+        /**
+         * Key in the serializers based on the edge
+         */
+
+
+        /**
+         * write edges from source->target
+         */
+
+
+        final long time = timeService.getCurrentTime();
+
+        final DirectedEdge sourceEdge = new DirectedEdge( targetNodeId, timestamp );
+
+        final DirectedEdgeMeta sourceEdgeMeta =  DirectedEdgeMeta.fromSourceNode( sourceNodeId, type );
+
+
+
+        final ShardEntryGroup sourceRowKeyShard =
+                writeEdgeShardStrategy.getWriteShards( scope, timestamp, sourceEdgeMeta );
+
+        final MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> sourceCf =
+                columnFamilies.getSourceNodeCfName();
+
+
+        for ( Shard shard : sourceRowKeyShard.getWriteShards(time) ) {
+
+            final long shardId = shard.getShardIndex();
+            final RowKey sourceRowKey = new RowKey( sourceNodeId, type, shardId );
+            op.writeEdge( sourceCf, sourceRowKey, sourceEdge );
+            op.countEdge( shard, sourceEdgeMeta );
+        }
+
+
+
+        final DirectedEdgeMeta sourceEdgeTargetTypeMeta =  DirectedEdgeMeta.fromSourceNodeTargetType( sourceNodeId,
+                type, targetNodeType );
+
+
+        final ShardEntryGroup sourceWithTypeRowKeyShard = writeEdgeShardStrategy
+                .getWriteShards( scope, timestamp, sourceEdgeTargetTypeMeta );
+
+        final MultiTennantColumnFamily<ApplicationScope, RowKeyType, DirectedEdge> targetCf =
+                columnFamilies.getSourceNodeTargetTypeCfName();
+
+        for ( Shard shard : sourceWithTypeRowKeyShard.getWriteShards(time) ) {
+
+            final long shardId = shard.getShardIndex();
+            final RowKeyType sourceRowKeyType = new RowKeyType( sourceNodeId, type, targetNodeId, shardId );
+
+            op.writeEdge( targetCf, sourceRowKeyType, sourceEdge );
+            op.countEdge( shard, sourceEdgeTargetTypeMeta );
+        }
+
+
+        /**
+         * write edges from target<-source
+         */
+
+        final DirectedEdge targetEdge = new DirectedEdge( sourceNodeId, timestamp );
+
+        final DirectedEdgeMeta targetEdgeMeta =  DirectedEdgeMeta.fromTargetNode( targetNodeId, type );
+
+
+
+        final ShardEntryGroup targetRowKeyShard =
+                writeEdgeShardStrategy.getWriteShards( scope, timestamp, targetEdgeMeta );
+
+        final MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> sourceByTargetCf =
+                columnFamilies.getTargetNodeCfName();
+
+        for ( Shard shard : targetRowKeyShard.getWriteShards(time) ) {
+            final long shardId = shard.getShardIndex();
+            final RowKey targetRowKey = new RowKey( targetNodeId, type, shardId );
+
+            op.writeEdge( sourceByTargetCf, targetRowKey, targetEdge );
+            op.countEdge( shard, targetEdgeMeta );
+        }
+
+
+
+        final DirectedEdgeMeta targetEdgeSourceTypeMeta =  DirectedEdgeMeta.fromTargetNodeSourceType( targetNodeId, type, sourceNodeType );
+
+
+        final ShardEntryGroup targetWithTypeRowKeyShard = writeEdgeShardStrategy
+                .getWriteShards( scope, timestamp, targetEdgeSourceTypeMeta );
+
+        final MultiTennantColumnFamily<ApplicationScope, RowKeyType, DirectedEdge> targetBySourceCf =
+                columnFamilies.getTargetNodeSourceTypeCfName();
+
+
+        for ( Shard shard : targetWithTypeRowKeyShard.getWriteShards(time) ) {
+
+            final long shardId = shard.getShardIndex();
+
+            final RowKeyType targetRowKeyType = new RowKeyType( targetNodeId, type, sourceNodeId, shardId );
+
+
+            op.writeEdge( targetBySourceCf, targetRowKeyType, targetEdge );
+            op.countEdge( shard, targetEdgeSourceTypeMeta );
+        }
+
+        /**
+         * Always a 0l shard, we're hard limiting 2b timestamps for the same edge
+         */
+        final EdgeRowKey edgeRowKey = new EdgeRowKey( sourceNodeId, type, targetNodeId, 0l );
+
+
+        /**
+         * Write this in the timestamp log for this edge of source->target
+         */
+        op.writeVersion( columnFamilies.getGraphEdgeVersions(), edgeRowKey, timestamp );
+    }
+
+
+    @Override
+    public Iterator<MarkedEdge> getEdgeVersions( final EdgeColumnFamilies columnFamilies, final ApplicationScope scope,
+                                                 final SearchByEdge search, final Iterator<ShardEntryGroup> shards ) {
+        ValidationUtils.validateApplicationScope( scope );
+        GraphValidation.validateSearchByEdge( search );
+
+        final Id targetId = search.targetNode();
+        final Id sourceId = search.sourceNode();
+        final String type = search.getType();
+        final long maxTimestamp = search.getMaxTimestamp();
+        final MultiTennantColumnFamily<ApplicationScope, EdgeRowKey, Long> columnFamily =
+                columnFamilies.getGraphEdgeVersions();
+        final Serializer<Long> serializer = columnFamily.getColumnSerializer();
+
+        final EdgeSearcher<EdgeRowKey, Long, MarkedEdge> searcher =
+                new EdgeSearcher<EdgeRowKey, Long, MarkedEdge>( scope, maxTimestamp, search.last(), shards ) {
+
+                    @Override
+                    protected Serializer<Long> getSerializer() {
+                        return serializer;
+                    }
+
+
+                    @Override
+                    public void setRange( final RangeBuilder builder ) {
+
+
+                        if ( last.isPresent() ) {
+                            super.setRange( builder );
+                            return;
+                        }
+
+                        //start seeking at a value < our max version
+                        builder.setStart( maxTimestamp );
+                    }
+
+
+                    @Override
+                    protected EdgeRowKey generateRowKey( long shard ) {
+                        return new EdgeRowKey( sourceId, type, targetId, shard );
+                    }
+
+
+                    @Override
+                    protected Long getStartColumn( final Edge last ) {
+                        return last.getTimestamp();
+                    }
+
+
+                    @Override
+                    protected MarkedEdge createEdge( final Long column, final boolean marked ) {
+                        return new SimpleMarkedEdge( sourceId, type, targetId, column.longValue(), marked );
+                    }
+
+
+                    @Override
+                    public int compare( final MarkedEdge o1, final MarkedEdge o2 ) {
+                        return Long.compare( o1.getTimestamp(), o2.getTimestamp() );
+                    }
+                };
+
+        return new ShardRowIterator<>( searcher, columnFamily, keyspace, cassandraConfig.getReadCL(),
+                graphFig.getScanPageSize() );
+    }
+
+
+    @Override
+    public Iterator<MarkedEdge> getEdgesFromSource( final EdgeColumnFamilies columnFamilies,
+                                                    final ApplicationScope scope, final SearchByEdgeType edgeType,
+                                                    final Iterator<ShardEntryGroup> shards ) {
+
+        ValidationUtils.validateApplicationScope( scope );
+        GraphValidation.validateSearchByEdgeType( edgeType );
+
+        final Id sourceId = edgeType.getNode();
+        final String type = edgeType.getType();
+        final long maxTimestamp = edgeType.getMaxTimestamp();
+        final MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> columnFamily =
+                columnFamilies.getSourceNodeCfName();
+        final Serializer<DirectedEdge> serializer = columnFamily.getColumnSerializer();
+
+
+        final SourceEdgeSearcher<RowKey, DirectedEdge, MarkedEdge> searcher =
+                new SourceEdgeSearcher<RowKey, DirectedEdge, MarkedEdge>( scope, maxTimestamp, edgeType.last(), shards ) {
+
+
+                    @Override
+                    protected Serializer<DirectedEdge> getSerializer() {
+                        return serializer;
+                    }
+
+
+                    @Override
+                    protected RowKey generateRowKey( long shard ) {
+                        return new RowKey( sourceId, type, shard );
+                    }
+
+
+                    @Override
+                    protected DirectedEdge getStartColumn( final Edge last ) {
+                        return new DirectedEdge( last.getTargetNode(), last.getTimestamp() );
+                    }
+
+
+                    @Override
+                    protected MarkedEdge createEdge( final DirectedEdge edge, final boolean marked ) {
+                        return new SimpleMarkedEdge( sourceId, type, edge.id, edge.timestamp, marked );
+                    }
+
+
+                };
+
+
+        return new ShardRowIterator<>( searcher, columnFamily, keyspace, cassandraConfig.getReadCL(),
+                graphFig.getScanPageSize() );
+    }
+
+
+    @Override
+    public Iterator<MarkedEdge> getEdgesFromSourceByTargetType( final EdgeColumnFamilies columnFamilies,
+                                                                final ApplicationScope scope,
+                                                                final SearchByIdType edgeType,
+                                                                final Iterator<ShardEntryGroup> shards ) {
+
+        ValidationUtils.validateApplicationScope( scope );
+        GraphValidation.validateSearchByEdgeType( edgeType );
+
+        final Id targetId = edgeType.getNode();
+        final String type = edgeType.getType();
+        final String targetType = edgeType.getIdType();
+        final long maxTimestamp = edgeType.getMaxTimestamp();
+        final MultiTennantColumnFamily<ApplicationScope, RowKeyType, DirectedEdge> columnFamily =
+                columnFamilies.getSourceNodeTargetTypeCfName();
+        final Serializer<DirectedEdge> serializer = columnFamily.getColumnSerializer();
+
+
+        final SourceEdgeSearcher<RowKeyType, DirectedEdge, MarkedEdge> searcher =
+                new SourceEdgeSearcher<RowKeyType, DirectedEdge, MarkedEdge>( scope, maxTimestamp, edgeType.last(), shards ) {
+
+                    @Override
+                    protected Serializer<DirectedEdge> getSerializer() {
+                        return serializer;
+                    }
+
+
+                    @Override
+                    protected RowKeyType generateRowKey( long shard ) {
+                        return new RowKeyType( targetId, type, targetType, shard );
+                    }
+
+
+                    @Override
+                    protected DirectedEdge getStartColumn( final Edge last ) {
+                        return new DirectedEdge( last.getTargetNode(), last.getTimestamp() );
+                    }
+
+
+                    @Override
+                    protected MarkedEdge createEdge( final DirectedEdge edge, final boolean marked ) {
+                        return new SimpleMarkedEdge( targetId, type, edge.id, edge.timestamp, marked );
+                    }
+
+
+
+                };
+
+        return new ShardRowIterator( searcher, columnFamily, keyspace, cassandraConfig.getReadCL(),
+                graphFig.getScanPageSize() );
+    }
+
+
+
+
+    @Override
+    public Iterator<MarkedEdge> getEdgesToTarget( final EdgeColumnFamilies columnFamilies, final ApplicationScope scope,
+                                                  final SearchByEdgeType edgeType,
+                                                  final Iterator<ShardEntryGroup> shards ) {
+        ValidationUtils.validateApplicationScope( scope );
+        GraphValidation.validateSearchByEdgeType( edgeType );
+
+        final Id targetId = edgeType.getNode();
+        final String type = edgeType.getType();
+        final long maxTimestamp = edgeType.getMaxTimestamp();
+        final MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> columnFamily =
+                columnFamilies.getTargetNodeCfName();
+        final Serializer<DirectedEdge> serializer = columnFamily.getColumnSerializer();
+
+        final TargetEdgeSearcher<RowKey, DirectedEdge, MarkedEdge> searcher =
+                new TargetEdgeSearcher<RowKey, DirectedEdge, MarkedEdge>( scope, maxTimestamp, edgeType.last(), shards ) {
+
+                    @Override
+                    protected Serializer<DirectedEdge> getSerializer() {
+                        return serializer;
+                    }
+
+
+                    @Override
+                    protected RowKey generateRowKey( long shard ) {
+                        return new RowKey( targetId, type, shard );
+                    }
+
+
+                    @Override
+                    protected DirectedEdge getStartColumn( final Edge last ) {
+                        return new DirectedEdge( last.getSourceNode(), last.getTimestamp() );
+                    }
+
+
+                    @Override
+                    protected MarkedEdge createEdge( final DirectedEdge edge, final boolean marked ) {
+                        return new SimpleMarkedEdge( edge.id, type, targetId, edge.timestamp, marked );
+                    }
+                };
+
+
+        return new ShardRowIterator<>( searcher, columnFamily, keyspace, cassandraConfig.getReadCL(),
+                graphFig.getScanPageSize() );
+    }
+
+
+    @Override
+    public Iterator<MarkedEdge> getEdgesToTargetBySourceType( final EdgeColumnFamilies columnFamilies,
+                                                              final ApplicationScope scope,
+                                                              final SearchByIdType edgeType,
+                                                              final Iterator<ShardEntryGroup> shards ) {
+
+        ValidationUtils.validateApplicationScope( scope );
+        GraphValidation.validateSearchByEdgeType( edgeType );
+
+        final Id targetId = edgeType.getNode();
+        final String sourceType = edgeType.getIdType();
+        final String type = edgeType.getType();
+        final long maxTimestamp = edgeType.getMaxTimestamp();
+        final MultiTennantColumnFamily<ApplicationScope, RowKeyType, DirectedEdge> columnFamily =
+                columnFamilies.getTargetNodeSourceTypeCfName();
+        final Serializer<DirectedEdge> serializer = columnFamily.getColumnSerializer();
+
+
+        final TargetEdgeSearcher<RowKeyType, DirectedEdge, MarkedEdge> searcher =
+                new TargetEdgeSearcher<RowKeyType, DirectedEdge, MarkedEdge>( scope, maxTimestamp, edgeType.last(), shards ) {
+                    @Override
+                    protected Serializer<DirectedEdge> getSerializer() {
+                        return serializer;
+                    }
+
+
+                    @Override
+                    protected RowKeyType generateRowKey( final long shard ) {
+                        return new RowKeyType( targetId, type, sourceType, shard );
+                    }
+
+
+                    @Override
+                    protected DirectedEdge getStartColumn( final Edge last ) {
+                        return new DirectedEdge( last.getTargetNode(), last.getTimestamp() );
+                    }
+
+
+                    @Override
+                    protected MarkedEdge createEdge( final DirectedEdge edge, final boolean marked ) {
+                        return new SimpleMarkedEdge( edge.id, type, targetId, edge.timestamp, marked );
+                    }
+                };
+
+        return new ShardRowIterator<>( searcher, columnFamily, keyspace, cassandraConfig.getReadCL(),
+                graphFig.getScanPageSize() );
+    }
+
+
+    /**
+     * Class for performing searched on rows based on source id
+     */
+    private static abstract class SourceEdgeSearcher<R, C, T extends Edge> extends EdgeSearcher<R, C, T>{
+
+        protected SourceEdgeSearcher( final ApplicationScope scope, final long maxTimestamp, final Optional<Edge> last,
+                                      final Iterator<ShardEntryGroup> shards ) {
+            super( scope, maxTimestamp, last, shards );
+        }
+
+
+        public int compare( final T o1, final T o2 ) {
+            int compare = Long.compare(o1.getTimestamp(), o2.getTimestamp());
+
+            if(compare == 0){
+                compare = o1.getTargetNode().compareTo( o2.getTargetNode());
+            }
+
+            return compare;
+        }
+
+
+    }
+
+
+    /**
+     * Class for performing searched on rows based on target id
+     */
+    private static abstract class TargetEdgeSearcher<R, C, T extends Edge> extends EdgeSearcher<R, C, T>{
+
+        protected TargetEdgeSearcher( final ApplicationScope scope, final long maxTimestamp, final Optional<Edge> last,
+                                      final Iterator<ShardEntryGroup> shards ) {
+            super( scope, maxTimestamp, last, shards );
+        }
+
+
+        public int compare( final T o1, final T o2 ) {
+            int compare = Long.compare(o1.getTimestamp(), o2.getTimestamp());
+
+            if(compare == 0){
+                compare = o1.getTargetNode().compareTo( o2.getTargetNode());
+            }
+
+            return compare;
+        }
+
+
+    }
+
+    /**
+     * Simple callback to perform puts and deletes with a common row setup code
+     *
+     * @param <R> The row key type
+     */
+    private static interface RowOp<R> {
+
+        /**
+         * Write the edge with the given data
+         */
+        void writeEdge( final MultiTennantColumnFamily<ApplicationScope, R, DirectedEdge> columnFamily, final  R rowKey,
+                        final DirectedEdge edge );
+
+        /**
+         * Perform the count on the edge
+         */
+        void countEdge( final Shard shard, final DirectedEdgeMeta directedEdgeMeta);
+
+        /**
+         * Write the edge into the version cf
+         */
+        void writeVersion( final MultiTennantColumnFamily<ApplicationScope, EdgeRowKey, Long> columnFamily,
+                           final EdgeRowKey rowKey, long timestamp );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72684dfc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeColumnFamilies.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeColumnFamilies.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeColumnFamilies.java
new file mode 100644
index 0000000..9050b0a
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeColumnFamilies.java
@@ -0,0 +1,150 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
+
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.marshal.DynamicCompositeType;
+
+import org.apache.usergrid.persistence.core.astyanax.ColumnTypes;
+import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
+import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
+import org.apache.usergrid.persistence.core.astyanax.OrganizationScopedRowKeySerializer;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdge;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeColumnFamilies;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeRowKey;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.RowKey;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.RowKeyType;
+
+import com.netflix.astyanax.serializers.LongSerializer;
+
+import static org.apache.usergrid.persistence.core.astyanax.ColumnTypes.LONG_TYPE_REVERSED;
+import static org.apache.usergrid.persistence.core.astyanax.ColumnTypes.UUID_TYPE_REVERSED;
+
+
+/**
+ * Implementation of size based column family
+ */
+public class SizebasedEdgeColumnFamilies implements EdgeColumnFamilies {
+
+
+    //Row key with no type
+    private static final RowSerializer ROW_SERIALIZER = new RowSerializer();
+
+    //row key with target id type
+    private static final RowTypeSerializer ROW_TYPE_SERIALIZER = new RowTypeSerializer();
+
+    private static final EdgeRowKeySerializer EDGE_ROW_KEY_SERIALIZER = new EdgeRowKeySerializer();
+
+    //Edge serializers
+    private static final EdgeSerializer EDGE_SERIALIZER = new EdgeSerializer();
+
+    private static final LongSerializer LONG_SERIALIZER = LongSerializer.get();
+
+    private static final String EDGE_DYNAMIC_COMPOSITE_TYPE =
+            //we purposefully associate lower case "l" and "u" with reversed types.  This way we can use
+            //the default serialization in Astayanax, but get reverse order in cassandra
+            DynamicCompositeType.class.getSimpleName() + "(s=>UTF8Type,l=>" + LONG_TYPE_REVERSED + ",u=>"
+                    + UUID_TYPE_REVERSED + ")";
+
+
+    //initialize the CF's from our implementation
+    private static final MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> SOURCE_NODE_EDGES =
+            new MultiTennantColumnFamily<>( "Graph_Source_Node_Edges",
+                    new OrganizationScopedRowKeySerializer<>( ROW_SERIALIZER ), EDGE_SERIALIZER );
+
+
+    private static final MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> TARGET_NODE_EDGES =
+            new MultiTennantColumnFamily<>( "Graph_Target_Node_Edges",
+                    new OrganizationScopedRowKeySerializer<>( ROW_SERIALIZER ), EDGE_SERIALIZER );
+
+
+    private static final MultiTennantColumnFamily<ApplicationScope, RowKeyType, DirectedEdge> SOURCE_NODE_TARGET_TYPE =
+            new MultiTennantColumnFamily<>( "Graph_Source_Node_Target_Type",
+                    new OrganizationScopedRowKeySerializer<>( ROW_TYPE_SERIALIZER ), EDGE_SERIALIZER );
+
+
+    /**
+     * The edges that are to the target node with the source type.  The target node is the row key
+     */
+    private static final MultiTennantColumnFamily<ApplicationScope, RowKeyType, DirectedEdge> TARGET_NODE_SOURCE_TYPE =
+            new MultiTennantColumnFamily<>( "Graph_Target_Node_Source_Type",
+                    new OrganizationScopedRowKeySerializer<>( ROW_TYPE_SERIALIZER ), EDGE_SERIALIZER );
+
+
+    private static final MultiTennantColumnFamily<ApplicationScope, EdgeRowKey, Long> EDGE_VERSIONS =
+            new MultiTennantColumnFamily<>( "Graph_Edge_Versions",
+                    new OrganizationScopedRowKeySerializer<>( EDGE_ROW_KEY_SERIALIZER ), LONG_SERIALIZER );
+
+
+    @Override
+    public MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> getSourceNodeCfName() {
+        return SOURCE_NODE_EDGES;
+    }
+
+
+    @Override
+    public MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> getTargetNodeCfName() {
+        return TARGET_NODE_EDGES;
+    }
+
+
+    @Override
+    public MultiTennantColumnFamily<ApplicationScope, RowKeyType, DirectedEdge> getSourceNodeTargetTypeCfName() {
+        return SOURCE_NODE_TARGET_TYPE;
+    }
+
+
+    @Override
+    public MultiTennantColumnFamily<ApplicationScope, RowKeyType, DirectedEdge> getTargetNodeSourceTypeCfName() {
+        return TARGET_NODE_SOURCE_TYPE;
+    }
+
+
+    @Override
+    public MultiTennantColumnFamily<ApplicationScope, EdgeRowKey, Long> getGraphEdgeVersions() {
+        return EDGE_VERSIONS;
+    }
+
+
+    @Override
+    public Collection<MultiTennantColumnFamilyDefinition> getColumnFamilies() {
+        return Arrays
+                .asList( graphCf( SOURCE_NODE_EDGES ), graphCf( TARGET_NODE_EDGES ), graphCf( SOURCE_NODE_TARGET_TYPE ),
+                        graphCf( TARGET_NODE_SOURCE_TYPE ),
+                        new MultiTennantColumnFamilyDefinition( EDGE_VERSIONS, BytesType.class.getSimpleName(),
+                                ColumnTypes.LONG_TYPE_REVERSED, BytesType.class.getSimpleName(),
+                                MultiTennantColumnFamilyDefinition.CacheOption.KEYS ) );
+    }
+
+
+    /**
+     * Helper to generate an edge definition by the type
+     */
+    private MultiTennantColumnFamilyDefinition graphCf( MultiTennantColumnFamily cf ) {
+        return new MultiTennantColumnFamilyDefinition( cf, BytesType.class.getSimpleName(), EDGE_DYNAMIC_COMPOSITE_TYPE,
+                BytesType.class.getSimpleName(), MultiTennantColumnFamilyDefinition.CacheOption.KEYS );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72684dfc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeShardStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeShardStrategy.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeShardStrategy.java
index f246f23..8787d97 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeShardStrategy.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeShardStrategy.java
@@ -23,10 +23,12 @@ package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
 import java.util.Iterator;
 
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardStrategy;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardApproximation;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardCache;
-import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
 
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
@@ -52,52 +54,21 @@ public class SizebasedEdgeShardStrategy implements EdgeShardStrategy {
 
 
     @Override
-    public long getWriteShard( final ApplicationScope scope, final Id rowKeyId, final long timestamp,
-                               final String... types ) {
-        return shardCache.getSlice( scope, rowKeyId, timestamp, types );
+    public ShardEntryGroup getWriteShards( final ApplicationScope scope,
+                                        final long timestamp, final DirectedEdgeMeta directedEdgeMeta ) {
+        return shardCache.getWriteShardGroup( scope, timestamp, directedEdgeMeta);
     }
 
 
     @Override
-    public Iterator<Long> getReadShards( final ApplicationScope scope, final Id rowKeyId, final long maxTimestamp,
-                                         final String... types ) {
-        return shardCache.getVersions( scope, rowKeyId, maxTimestamp, types );
+    public Iterator<ShardEntryGroup> getReadShards( final ApplicationScope scope, final long maxTimestamp, final DirectedEdgeMeta directedEdgeMeta ) {
+        return shardCache.getReadShardGroup( scope, maxTimestamp, directedEdgeMeta );
     }
 
 
     @Override
-    public void increment( final ApplicationScope scope, final Id rowKeyId, final long shardId, final long count,
-                           final String... types ) {
-        shardApproximation.increment( scope, rowKeyId, shardId, count, types );
-    }
-
-
-    @Override
-    public String getSourceNodeCfName() {
-        return "Graph_Source_Node_Edges";
-    }
-
-
-    @Override
-    public String getTargetNodeCfName() {
-        return "Graph_Target_Node_Edges";
-    }
-
-
-    @Override
-    public String getSourceNodeTargetTypeCfName() {
-        return "Graph_Source_Node_Target_Type";
-    }
-
-
-    @Override
-    public String getTargetNodeSourceTypeCfName() {
-        return "Graph_Target_Node_Source_Type";
-    }
-
-
-    @Override
-    public String getGraphEdgeVersions() {
-        return "Graph_Edge_Versions";
+    public void increment( final ApplicationScope scope, final Shard shard,
+                           final long count, final DirectedEdgeMeta directedEdgeMeta) {
+        shardApproximation.increment( scope, shard,  count, directedEdgeMeta );
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72684dfc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SourceEdgeSearcher.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SourceEdgeSearcher.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SourceEdgeSearcher.java
new file mode 100644
index 0000000..b33fcaf
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SourceEdgeSearcher.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
+
+
+public class SourceEdgeSearcher {}