You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2014/08/05 00:14:59 UTC

[06/11] Finished refactor. Need to continue on shard allocation

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/69a3faa2/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeRowKey.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeRowKey.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeRowKey.java
deleted file mode 100644
index 12b1d7c..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeRowKey.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
-
-
-import org.apache.usergrid.persistence.model.entity.Id;
-
-
-/**
- * Used to store row keys by sourceId, targetId and edgeType
- */
-public class EdgeRowKey {
-    public final Id nodeId;
-    public final String[] edgeTypes;
-
-
-    public EdgeRowKey( final Id nodeId, final String[] edgeTypes ) {
-        this.nodeId = nodeId;
-        this.edgeTypes = edgeTypes;
-    }
-
-
-}
-
-
-

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/69a3faa2/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeRowKeySerializer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeRowKeySerializer.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeRowKeySerializer.java
index be97f95..90b264c 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeRowKeySerializer.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeRowKeySerializer.java
@@ -21,8 +21,8 @@ package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
 
 import org.apache.usergrid.persistence.core.astyanax.CompositeFieldSerializer;
 import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeRowKey;
 import org.apache.usergrid.persistence.model.entity.Id;
-import org.apache.usergrid.persistence.model.entity.SimpleId;
 
 import com.netflix.astyanax.model.CompositeBuilder;
 import com.netflix.astyanax.model.CompositeParser;
@@ -31,9 +31,8 @@ import com.netflix.astyanax.model.CompositeParser;
 /**
  * Class to perform serialization for row keys from edges
  */
-public class EdgeRowKeySerializer implements CompositeFieldSerializer<EdgeRowKey> {
 
-    private static final EdgeRowKeySerializer INSTANCE = new EdgeRowKeySerializer();
+public class EdgeRowKeySerializer implements CompositeFieldSerializer<EdgeRowKey> {
 
     private static final IdRowCompositeSerializer ID_SER = IdRowCompositeSerializer.get();
 
@@ -42,13 +41,10 @@ public class EdgeRowKeySerializer implements CompositeFieldSerializer<EdgeRowKey
     public void toComposite( final CompositeBuilder builder, final EdgeRowKey key ) {
 
         //add the row id to the composite
-        ID_SER.toComposite( builder, key.nodeId );
-
-        builder.addInteger( key.edgeTypes.length );
-
-        for(String type: key.edgeTypes){
-            builder.addString( type );
-        }
+        ID_SER.toComposite( builder, key.sourceId );
+        builder.addString( key.edgeType );
+        ID_SER.toComposite( builder, key.targetId );
+        builder.addLong( key.shardId );
     }
 
 
@@ -56,26 +52,12 @@ public class EdgeRowKeySerializer implements CompositeFieldSerializer<EdgeRowKey
     public EdgeRowKey fromComposite( final CompositeParser composite ) {
 
         final Id sourceId = ID_SER.fromComposite( composite );
+        final String edgeType = composite.readString();
+        final Id targetId = ID_SER.fromComposite( composite );
+        final long shard = composite.readLong();
 
-
-        final int length = composite.readInteger();
-
-        String[] types = new String[length];
-
-        for(int i = 0; i < length; i++){
-            types[i] = composite.readString();
-        }
-
-        return new EdgeRowKey( sourceId, types );
-
+        return new EdgeRowKey( sourceId, edgeType, targetId, shard );
     }
 
 
-
-    /**
-     * Get the singleton serializer
-     */
-    public static EdgeRowKeySerializer get() {
-        return INSTANCE;
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/69a3faa2/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
new file mode 100644
index 0000000..3644210
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
@@ -0,0 +1,124 @@
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
+
+
+import java.util.Iterator;
+
+import org.apache.usergrid.persistence.core.astyanax.ColumnParser;
+import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntries;
+
+import com.google.common.base.Optional;
+import com.netflix.astyanax.Serializer;
+import com.netflix.astyanax.model.Column;
+import com.netflix.astyanax.util.RangeBuilder;
+
+
+/**
+ * Searcher to be used when performing the search.  Performs I/O transformation as well as parsing for the iterator. If
+ * there are more row keys available to seek, the iterator will return true
+ *
+ * @param <R> The row type
+ * @param <C> The column type
+ * @param <T> The parsed return type
+ */
+public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Iterator<ScopedRowKey<ApplicationScope, R>> {
+
+    protected final Optional<Edge> last;
+    protected final long maxTimestamp;
+    protected final ApplicationScope scope;
+    protected final Iterator<ShardEntries> shards;
+
+
+    protected EdgeSearcher( final ApplicationScope scope, final long maxTimestamp, final Optional<Edge> last,
+                            final Iterator<ShardEntries> shards ) {
+        this.scope = scope;
+        this.maxTimestamp = maxTimestamp;
+        this.last = last;
+        this.shards = shards;
+    }
+
+
+    @Override
+    public boolean hasNext() {
+        return shards.hasNext();
+    }
+
+
+    @Override
+    public ScopedRowKey<ApplicationScope, R> next() {
+        /**
+         * Todo, multi scan
+         */
+        return ScopedRowKey
+                .fromKey( scope, generateRowKey( shards.next().getEntries().iterator().next().getShardIndex() ) );
+    }
+
+
+    @Override
+    public void remove() {
+        throw new UnsupportedOperationException( "Remove is unsupported" );
+    }
+
+
+    /**
+     * Set the range on a search
+     */
+    public void setRange( final RangeBuilder builder ) {
+
+        //set our start range since it was supplied to us
+        if ( last.isPresent() ) {
+            C sourceEdge = getStartColumn( last.get() );
+
+
+            builder.setStart( sourceEdge, getSerializer() );
+        }
+        else {
+
+
+        }
+    }
+
+
+    public boolean hasPage() {
+        return last.isPresent();
+    }
+
+
+    @Override
+    public T parseColumn( final Column<C> column ) {
+        final C edge = column.getName();
+
+        return createEdge( edge, column.getBooleanValue() );
+    }
+
+
+    /**
+     * Get the column's serializer
+     */
+    protected abstract Serializer<C> getSerializer();
+
+
+    /**
+     * Create a row key for this search to use
+     *
+     * @param shard The shard to use in the row key
+     */
+    protected abstract R generateRowKey( final long shard );
+
+
+    /**
+     * Set the start column to begin searching from.  The last is provided
+     */
+    protected abstract C getStartColumn( final Edge last );
+
+
+    /**
+     * Create an edge to return to the user based on the directed edge provided
+     *
+     * @param column The column name
+     * @param marked The marked flag in the column value
+     */
+    protected abstract T createEdge( final C column, final boolean marked );
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/69a3faa2/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSerializer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSerializer.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSerializer.java
new file mode 100644
index 0000000..d93f679
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSerializer.java
@@ -0,0 +1,77 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
+
+
+import java.nio.ByteBuffer;
+
+import org.apache.usergrid.persistence.core.astyanax.IdColDynamicCompositeSerializer;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdge;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Preconditions;
+import com.netflix.astyanax.model.DynamicComposite;
+import com.netflix.astyanax.serializers.AbstractSerializer;
+import com.netflix.astyanax.serializers.LongSerializer;
+
+
+/**
+ * Serializes to a source->target edge Note that we cannot set the edge type on de-serialization.  Only the target
+ * Id and version.
+ */
+public class EdgeSerializer extends AbstractSerializer<DirectedEdge> {
+
+    private static final IdColDynamicCompositeSerializer ID_COL_SERIALIZER = IdColDynamicCompositeSerializer.get();
+    private static final LongSerializer LONG_SERIALIZER = LongSerializer.get();
+
+
+    @Override
+    public ByteBuffer toByteBuffer( final DirectedEdge edge ) {
+
+        DynamicComposite composite = new DynamicComposite();
+
+        composite.addComponent( edge.timestamp, LONG_SERIALIZER );
+
+        ID_COL_SERIALIZER.toComposite( composite, edge.id );
+
+        return composite.serialize();
+    }
+
+
+    @Override
+    public DirectedEdge fromByteBuffer( final ByteBuffer byteBuffer ) {
+        DynamicComposite composite = DynamicComposite.fromByteBuffer( byteBuffer );
+
+        Preconditions.checkArgument( composite.size() == 3, "Composite should have 3 elements" );
+
+
+        //return the version
+        final long timestamp = composite.get( 0, LONG_SERIALIZER );
+
+
+        //parse our id
+        final Id id = ID_COL_SERIALIZER.fromComposite( composite, 1 );
+
+
+        return new DirectedEdge( id, timestamp );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/69a3faa2/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
index 5b7c901..745e02a 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
@@ -66,7 +66,8 @@ public class EdgeShardSerializationImpl implements EdgeShardSerialization {
      */
     private static final MultiTennantColumnFamily<ApplicationScope, DirectedRowKey, Long> EDGE_SHARDS =
             new MultiTennantColumnFamily<>( "Edge_Shards",
-                    new OrganizationScopedRowKeySerializer<>( new DirectedEdgeRowKeySerializer() ), LongSerializer.get() );
+                    new OrganizationScopedRowKeySerializer<>( new DirectedEdgeRowKeySerializer() ),
+                    LongSerializer.get() );
 
 
     private static final byte HOLDER = 0x00;
@@ -239,8 +240,6 @@ public class EdgeShardSerializationImpl implements EdgeShardSerialization {
 
             return NodeType.TARGET;
         }
-
-
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/69a3faa2/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
index c0a44a9..20cb921 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
@@ -32,6 +32,7 @@ import org.apache.usergrid.persistence.core.consistency.TimeService;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.GraphFig;
 import org.apache.usergrid.persistence.graph.exception.GraphRuntimeException;
+import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardSerialization;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardAllocation;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardApproximation;
@@ -55,17 +56,21 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
     private static final MinShardTimeComparator MIN_SHARD_TIME_COMPARATOR = new MinShardTimeComparator();
 
     private final EdgeShardSerialization edgeShardSerialization;
-//    private final NodeShardCounterSerialization edgeShardCounterSerialization;
+    private final EdgeSerialization edgeSerialization;
+    //    private final NodeShardCounterSerialization edgeShardCounterSerialization;
     private final NodeShardApproximation nodeShardApproximation;
     private final TimeService timeService;
     private final GraphFig graphFig;
     private final Keyspace keyspace;
 
+
     @Inject
     public NodeShardAllocationImpl( final EdgeShardSerialization edgeShardSerialization,
+                                    final EdgeSerialization edgeSerialization,
                                     final NodeShardApproximation nodeShardApproximation, final TimeService timeService,
                                     final GraphFig graphFig, final Keyspace keyspace ) {
         this.edgeShardSerialization = edgeShardSerialization;
+        this.edgeSerialization = edgeSerialization;
         this.nodeShardApproximation = nodeShardApproximation;
         this.timeService = timeService;
         this.graphFig = graphFig;
@@ -74,11 +79,11 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
 
 
     @Override
-    public Iterator<Shard> getShards( final ApplicationScope scope, final Id nodeId, final NodeType nodeType, final Optional<Shard> maxShardId,
-                                            final String... edgeTypes ) {
+    public Iterator<Shard> getShards( final ApplicationScope scope, final Id nodeId, final NodeType nodeType,
+                                      final Optional<Shard> maxShardId, final String... edgeTypes ) {
 
         final Iterator<Shard> existingShards =
-                edgeShardSerialization.getEdgeMetaData( scope, nodeId,nodeType, maxShardId, edgeTypes );
+                edgeShardSerialization.getEdgeMetaData( scope, nodeId, nodeType, maxShardId, edgeTypes );
 
         final PushbackIterator<Shard> pushbackIterator = new PushbackIterator( existingShards );
 
@@ -95,7 +100,7 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
             final Shard shard = pushbackIterator.next();
 
             //we're done, our current time uuid is greater than the value stored
-            if ( shard.getCreatedTime() < minConflictTime  ) {
+            if ( shard.getCreatedTime() < minConflictTime ) {
                 //push it back into the iterator
                 pushbackIterator.pushback( shard );
                 break;
@@ -106,7 +111,7 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
 
 
         //clean up our future
-        Collections.sort(futures, MIN_SHARD_TIME_COMPARATOR);
+        Collections.sort( futures, MIN_SHARD_TIME_COMPARATOR );
 
 
         //we have more than 1 future value, we need to remove it
@@ -114,10 +119,11 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
         MutationBatch cleanup = keyspace.prepareMutationBatch();
 
         //remove all futures except the last one, it is the only value we shouldn't lazy remove
-        for ( int i = 1; i < futures.size() ; i++ ) {
+        for ( int i = 1; i < futures.size(); i++ ) {
             final Shard toRemove = futures.get( i );
 
-            final MutationBatch batch = edgeShardSerialization.removeEdgeMeta( scope, nodeId, nodeType, toRemove.getShardIndex(), edgeTypes );
+            final MutationBatch batch = edgeShardSerialization
+                    .removeEdgeMeta( scope, nodeId, nodeType, toRemove.getShardIndex(), edgeTypes );
 
             cleanup.mergeShallow( batch );
         }
@@ -131,7 +137,7 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
         }
 
 
-        final int futuresSize =  futures.size();
+        final int futuresSize = futures.size();
 
         if ( futuresSize > 0 ) {
             pushbackIterator.pushback( futures.get( 0 ) );
@@ -141,8 +147,8 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
         /**
          * Nothing to iterate, return an iterator with 0.
          */
-        if(!pushbackIterator.hasNext()){
-            pushbackIterator.pushback( new Shard(0l, 0l) );
+        if ( !pushbackIterator.hasNext() ) {
+            pushbackIterator.pushback( new Shard( 0l, 0l ) );
         }
 
         return pushbackIterator;
@@ -150,7 +156,8 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
 
 
     @Override
-    public boolean auditMaxShard( final ApplicationScope scope, final Id nodeId,final NodeType nodeType,  final String... edgeType ) {
+    public boolean auditMaxShard( final ApplicationScope scope, final Id nodeId, final NodeType nodeType,
+                                  final String... edgeType ) {
 
         final Iterator<Shard> maxShards = getShards( scope, nodeId, nodeType, Optional.<Shard>absent(), edgeType );
 
@@ -169,23 +176,32 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
          */
 
 
-        final long count = nodeShardApproximation.getCount( scope, nodeId, nodeType,  maxShard.getShardIndex(), edgeType );
+        final long count =
+                nodeShardApproximation.getCount( scope, nodeId, nodeType, maxShard.getShardIndex(), edgeType );
 
         if ( count < graphFig.getShardSize() ) {
             return false;
         }
 
+
+        /**
+         * TODO, use the EdgeShardStrategy and ShardEdgeSerialization to audit this shard
+         */
+
+        //get the max edge, in this shard, and write it.
+
+
         //try to get a lock here, and fail if one isn't present
 
-//        final long newShardTime = timeService.getCurrentTime() + graphFig.getShardCacheTimeout() * 2;
-//
-//
-//        try {
-//            this.edgeShardSerialization.writeEdgeMeta( scope, nodeId, newShardTime, edgeType ).execute();
-//        }
-//        catch ( ConnectionException e ) {
-//            throw new GraphRuntimeException( "Unable to write the new edge metadata" );
-//        }
+        //        final long newShardTime = timeService.getCurrentTime() + graphFig.getShardCacheTimeout() * 2;
+        //
+        //
+        //        try {
+        //            this.edgeShardSerialization.writeEdgeMeta( scope, nodeId, newShardTime, edgeType ).execute();
+        //        }
+        //        catch ( ConnectionException e ) {
+        //            throw new GraphRuntimeException( "Unable to write the new edge metadata" );
+        //        }
 
 
         return true;
@@ -194,7 +210,7 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
 
     @Override
     public long getMinTime() {
-        return timeService.getCurrentTime() - (2 * graphFig.getShardCacheTimeout());
+        return timeService.getCurrentTime() - ( 2 * graphFig.getShardCacheTimeout() );
     }
 
 
@@ -203,16 +219,15 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
      */
     private static final class MinShardTimeComparator implements Comparator<Shard> {
 
-            @Override
-            public int compare( final Shard s1, final Shard s2 ) {
-                int result =  Long.compare( s1.getCreatedTime(), s2.getCreatedTime() );
-
-                if(result == 0){
-                    result = Long.compare( s1.getShardIndex(), s2.getShardIndex() );
-                }
+        @Override
+        public int compare( final Shard s1, final Shard s2 ) {
+            int result = Long.compare( s1.getCreatedTime(), s2.getCreatedTime() );
 
-                return result;
+            if ( result == 0 ) {
+                result = Long.compare( s1.getShardIndex(), s2.getShardIndex() );
             }
-        }
 
+            return result;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/69a3faa2/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
index a0bb0ef..73e8d4f 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
@@ -47,7 +47,6 @@ import com.google.common.cache.LoadingCache;
 import com.google.inject.Inject;
 
 
-
 /**
  * Simple implementation of the shard.  Uses a local Guava shard with a timeout.  If a value is not present in the
  * shard, it will need to be searched via cassandra.
@@ -67,8 +66,8 @@ public class NodeShardCacheImpl implements NodeShardCache {
      */
     @Inject
     public NodeShardCacheImpl( final NodeShardAllocation nodeShardAllocation, final GraphFig graphFig ) {
-        Preconditions.checkNotNull(nodeShardAllocation, "nodeShardAllocation is required");
-        Preconditions.checkNotNull(graphFig, "consistencyFig is required");
+        Preconditions.checkNotNull( nodeShardAllocation, "nodeShardAllocation is required" );
+        Preconditions.checkNotNull( graphFig, "consistencyFig is required" );
 
         this.nodeShardAllocation = nodeShardAllocation;
         this.graphFig = graphFig;
@@ -81,7 +80,8 @@ public class NodeShardCacheImpl implements NodeShardCache {
             public void propertyChange( final PropertyChangeEvent evt ) {
                 final String propertyName = evt.getPropertyName();
 
-                if ( propertyName.equals( GraphFig.SHARD_CACHE_SIZE ) || propertyName.equals( GraphFig.SHARD_CACHE_TIMEOUT ) ) {
+                if ( propertyName.equals( GraphFig.SHARD_CACHE_SIZE ) || propertyName
+                        .equals( GraphFig.SHARD_CACHE_TIMEOUT ) ) {
                     updateCache();
                 }
             }
@@ -95,8 +95,8 @@ public class NodeShardCacheImpl implements NodeShardCache {
 
 
     @Override
-    public ShardEntries getWriteShards( final ApplicationScope scope, final Id nodeId, final NodeType nodeType, final long timestamp,
-                               final String... edgeType ) {
+    public ShardEntries getWriteShards( final ApplicationScope scope, final Id nodeId, final NodeType nodeType,
+                                        final long timestamp, final String... edgeType ) {
 
 
         final CacheKey key = new CacheKey( scope, nodeId, nodeType, edgeType );
@@ -121,27 +121,28 @@ public class NodeShardCacheImpl implements NodeShardCache {
 
 
     @Override
-    public Iterator<ShardEntries> getReadShards( final ApplicationScope scope, final Id nodeId, final NodeType nodeType, final long maxTimestamp,
-                                     final String... edgeType ) {
+    public Iterator<ShardEntries> getReadShards( final ApplicationScope scope, final Id nodeId, final NodeType nodeType,
+                                                 final long maxTimestamp, final String... edgeType ) {
         final CacheKey key = new CacheKey( scope, nodeId, nodeType, edgeType );
-              CacheEntry entry;
+        CacheEntry entry;
 
-              try {
-                  entry = this.graphs.get( key );
-              }
-              catch ( ExecutionException e ) {
-                  throw new GraphRuntimeException( "Unable to load shard key for graph", e );
-              }
+        try {
+            entry = this.graphs.get( key );
+        }
+        catch ( ExecutionException e ) {
+            throw new GraphRuntimeException( "Unable to load shard key for graph", e );
+        }
 
         Iterator<ShardEntries> iterator = entry.getShards( maxTimestamp );
 
-        if(iterator == null){
+        if ( iterator == null ) {
             return Collections.<ShardEntries>emptyList().iterator();
         }
 
         return iterator;
     }
 
+
     /**
      * This is a race condition.  We could re-init the shard while another thread is reading it.  This is fine, the read
      * doesn't have to be precise.  The algorithm accounts for stale data.
@@ -149,27 +150,31 @@ public class NodeShardCacheImpl implements NodeShardCache {
     private void updateCache() {
 
         this.graphs = CacheBuilder.newBuilder().maximumSize( graphFig.getShardCacheSize() )
-                  .expireAfterWrite( graphFig.getShardCacheSize(), TimeUnit.MILLISECONDS )
-                  .build( new CacheLoader<CacheKey, CacheEntry>() {
+                                  .expireAfterWrite( graphFig.getShardCacheSize(), TimeUnit.MILLISECONDS )
+                                  .build( new CacheLoader<CacheKey, CacheEntry>() {
 
 
-                      @Override
-                      public CacheEntry load( final CacheKey key ) throws Exception {
+                                      @Override
+                                      public CacheEntry load( final CacheKey key ) throws Exception {
 
-//
-//                          /**
-//                           * Perform an audit in case we need to allocate a new shard
-//                           */
-//                          nodeShardAllocation.auditMaxShard( key.scope, key.id, key.types );
-//                          //TODO, we need to put some sort of upper bounds on this, it could possibly get too large
+                                          //
+                                          //                          /**
+                                          //                           * Perform an audit in case we need to allocate
+                                          // a new shard
+                                          //                           */
+                                          //                          nodeShardAllocation.auditMaxShard( key.scope,
+                                          // key.id, key.types );
+                                          //                          //TODO, we need to put some sort of upper
+                                          // bounds on this, it could possibly get too large
 
 
-                          final Iterator<Shard> edges = nodeShardAllocation
-                                  .getShards( key.scope, key.id, key.nodeType,  Optional.<Shard>absent(), key.types );
+                                          final Iterator<Shard> edges = nodeShardAllocation
+                                                  .getShards( key.scope, key.id, key.nodeType, Optional.<Shard>absent(),
+                                                          key.types );
 
-                          return new CacheEntry( edges );
-                      }
-                  } );
+                                          return new CacheEntry( edges );
+                                      }
+                                  } );
     }
 
 
@@ -241,7 +246,7 @@ public class NodeShardCacheImpl implements NodeShardCache {
 
 
         private CacheEntry( final Iterator<Shard> shards ) {
-            this.shards = new TreeSet<>( );
+            this.shards = new TreeSet<>();
 
             for ( Shard shard : IterableUtil.wrap( shards ) ) {
                 this.shards.add( shard );
@@ -254,23 +259,21 @@ public class NodeShardCacheImpl implements NodeShardCache {
          */
         public ShardEntries getShardId( final Long seek ) {
             return bootstrapEntry();
-//            return this.shards.floor( seek );
+            //            return this.shards.floor( seek );
         }
 
 
         /**
          * Get all shards <= this one in decending order
-         * @return
          */
-        public Iterator<ShardEntries> getShards( final Long maxShard ){
-            return  Collections.singleton(bootstrapEntry() ).iterator();
-//            return this.shards.headSet(maxShard, true  ).descendingIterator();
+        public Iterator<ShardEntries> getShards( final Long maxShard ) {
+            return Collections.singleton( bootstrapEntry() ).iterator();
+            //            return this.shards.headSet(maxShard, true  ).descendingIterator();
         }
 
-        private ShardEntries bootstrapEntry(){
-            return new ShardEntries( Collections.singleton( new Shard(0l, 0l) ) );
+
+        private ShardEntries bootstrapEntry() {
+            return new ShardEntries( Collections.singleton( new Shard( 0l, 0l ) ) );
         }
     }
-
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/69a3faa2/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/RowSerializer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/RowSerializer.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/RowSerializer.java
new file mode 100644
index 0000000..f901699
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/RowSerializer.java
@@ -0,0 +1,64 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
+
+
+import org.apache.usergrid.persistence.core.astyanax.CompositeFieldSerializer;
+import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.RowKey;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.netflix.astyanax.model.CompositeBuilder;
+import com.netflix.astyanax.model.CompositeParser;
+
+
+/**
+ * Class to perform serialization for row keys from edges
+ */
+public class RowSerializer implements CompositeFieldSerializer<RowKey> {
+
+    private static final IdRowCompositeSerializer ID_SER = IdRowCompositeSerializer.get();
+
+
+    @Override
+    public void toComposite( final CompositeBuilder builder, final RowKey key ) {
+
+        //add the row id to the composite
+        ID_SER.toComposite( builder, key.nodeId );
+
+        builder.addLong( key.hash[0] );
+        builder.addLong( key.hash[1] );
+        builder.addLong( key.shardId );
+    }
+
+
+    @Override
+    public RowKey fromComposite( final CompositeParser composite ) {
+
+        final Id id = ID_SER.fromComposite( composite );
+        final long[] hash = new long[] { composite.readLong(), composite.readLong() };
+        final long shard = composite.readLong();
+
+
+        return new RowKey( id, hash, shard );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/69a3faa2/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/RowTypeSerializer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/RowTypeSerializer.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/RowTypeSerializer.java
new file mode 100644
index 0000000..6591d72
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/RowTypeSerializer.java
@@ -0,0 +1,60 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
+
+
+import org.apache.usergrid.persistence.core.astyanax.CompositeFieldSerializer;
+import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.RowKeyType;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.netflix.astyanax.model.CompositeBuilder;
+import com.netflix.astyanax.model.CompositeParser;
+
+
+public class RowTypeSerializer implements CompositeFieldSerializer<RowKeyType> {
+
+    private static final IdRowCompositeSerializer ID_SER = IdRowCompositeSerializer.get();
+
+
+    @Override
+    public void toComposite( final CompositeBuilder builder, final RowKeyType keyType ) {
+
+        //add the row id to the composite
+        ID_SER.toComposite( builder, keyType.nodeId );
+
+        builder.addLong( keyType.hash[0] );
+        builder.addLong( keyType.hash[1] );
+        builder.addLong( keyType.shardId );
+    }
+
+
+    @Override
+    public RowKeyType fromComposite( final CompositeParser composite ) {
+
+        final Id id = ID_SER.fromComposite( composite );
+        final long[] hash = new long[] { composite.readLong(), composite.readLong() };
+        final long shard = composite.readLong();
+
+        return new RowKeyType( id, hash, shard );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/69a3faa2/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardRowIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardRowIterator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardRowIterator.java
new file mode 100644
index 0000000..1da85e1
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardRowIterator.java
@@ -0,0 +1,112 @@
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
+
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import org.apache.usergrid.persistence.core.astyanax.ColumnNameIterator;
+import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
+import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.model.ConsistencyLevel;
+import com.netflix.astyanax.query.RowQuery;
+import com.netflix.astyanax.util.RangeBuilder;
+
+
+/**
+ * Internal iterator to iterate over multiple row keys
+ *
+ * @param <R> The row type
+ * @param <C> The column type
+ * @param <T> The parsed return type
+ */
+public class ShardRowIterator<R, C, T> implements Iterator<T> {
+
+    private final EdgeSearcher<R, C, T> searcher;
+
+    private final MultiTennantColumnFamily<ApplicationScope, R, C> cf;
+
+    private Iterator<T> currentColumnIterator;
+
+    private final Keyspace keyspace;
+
+    private final int pageSize;
+
+    private final ConsistencyLevel consistencyLevel;
+
+
+    public ShardRowIterator( final EdgeSearcher<R, C, T> searcher,
+                             final MultiTennantColumnFamily<ApplicationScope, R, C> cf, final Keyspace keyspace,
+                             final ConsistencyLevel consistencyLevel, final int pageSize ) {
+        this.searcher = searcher;
+        this.cf = cf;
+        this.keyspace = keyspace;
+        this.pageSize = pageSize;
+        this.consistencyLevel = consistencyLevel;
+    }
+
+
+    @Override
+    public boolean hasNext() {
+        //we have more columns to return
+        if ( currentColumnIterator != null && currentColumnIterator.hasNext() ) {
+            return true;
+        }
+
+        /**
+         * We have another row key, advance to it and re-check
+         */
+        if ( searcher.hasNext() ) {
+            advanceRow();
+            return hasNext();
+        }
+
+        //we have no more columns, and no more row keys, we're done
+        return false;
+    }
+
+
+    @Override
+    public T next() {
+        if ( !hasNext() ) {
+            throw new NoSuchElementException( "There are no more rows or columns left to advance" );
+        }
+
+        return currentColumnIterator.next();
+    }
+
+
+    @Override
+    public void remove() {
+        throw new UnsupportedOperationException( "Remove is unsupported" );
+    }
+
+
+    /**
+     * Advance our iterator to the next row (assumes the check for row keys is elsewhere)
+     */
+    private void advanceRow() {
+
+        /**
+         * If the edge is present, we need to being seeking from this
+         */
+
+        final RangeBuilder rangeBuilder = new RangeBuilder().setLimit( pageSize );
+
+
+        //set the range into the search
+        searcher.setRange( rangeBuilder );
+
+        final ScopedRowKey<ApplicationScope, R> rowKey = searcher.next();
+
+
+        RowQuery<ScopedRowKey<ApplicationScope, R>, C> query =
+                keyspace.prepareQuery( cf ).setConsistencyLevel( consistencyLevel ).getKey( rowKey )
+                        .autoPaginate( true ).withColumnRange( rangeBuilder.build() );
+
+
+        currentColumnIterator = new ColumnNameIterator<C, T>( query, searcher, searcher.hasPage() );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/69a3faa2/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
new file mode 100644
index 0000000..a0d1e6e
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
@@ -0,0 +1,568 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
+
+
+import java.util.Iterator;
+import java.util.UUID;
+
+import javax.inject.Inject;
+
+import org.apache.usergrid.persistence.core.astyanax.CassandraConfig;
+import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
+import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.util.ValidationUtils;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphFig;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.SearchByEdge;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
+import org.apache.usergrid.persistence.graph.SearchByIdType;
+import org.apache.usergrid.persistence.graph.impl.SimpleMarkedEdge;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdge;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeColumnFamilies;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeRowKey;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardStrategy;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeType;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.RowKey;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.RowKeyType;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntries;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardedEdgeSerialization;
+import org.apache.usergrid.persistence.graph.serialization.util.EdgeUtils;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.inject.Singleton;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.Serializer;
+import com.netflix.astyanax.util.RangeBuilder;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+
+@Singleton
+public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
+
+    protected final Keyspace keyspace;
+    protected final CassandraConfig cassandraConfig;
+    protected final GraphFig graphFig;
+    protected final EdgeShardStrategy writeEdgeShardStrategy;
+
+
+    @Inject
+    public ShardedEdgeSerializationImpl( final Keyspace keyspace, final CassandraConfig cassandraConfig,
+                                         final GraphFig graphFig, final EdgeShardStrategy writeEdgeShardStrategy ) {
+
+        checkNotNull( "keyspace required", keyspace );
+        checkNotNull( "cassandraConfig required", cassandraConfig );
+        checkNotNull( "consistencyFig required", graphFig );
+        checkNotNull( "writeEdgeShardStrategy required", writeEdgeShardStrategy );
+
+
+        this.keyspace = keyspace;
+        this.cassandraConfig = cassandraConfig;
+        this.graphFig = graphFig;
+        this.writeEdgeShardStrategy = writeEdgeShardStrategy;
+    }
+
+
+    @Override
+    public MutationBatch writeEdge( final EdgeColumnFamilies columnFamilies, final ApplicationScope scope,
+                                    final MarkedEdge markedEdge, final UUID timestamp ) {
+        ValidationUtils.validateApplicationScope( scope );
+        EdgeUtils.validateEdge( markedEdge );
+        ValidationUtils.verifyTimeUuid( timestamp, "timestamp" );
+
+
+        final MutationBatch batch = keyspace.prepareMutationBatch().withConsistencyLevel( cassandraConfig.getWriteCL() )
+                                            .withTimestamp( timestamp.timestamp() );
+
+        final boolean isDeleted = markedEdge.isDeleted();
+
+
+        doWrite( columnFamilies, scope, markedEdge, new RowOp<RowKey>() {
+            @Override
+            public void writeEdge( final MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> columnFamily,
+                                   final RowKey rowKey, final DirectedEdge edge ) {
+                batch.withRow( columnFamily, ScopedRowKey.fromKey( scope, rowKey ) ).putColumn( edge, isDeleted );
+            }
+
+
+            @Override
+            public void countEdge( final Id rowId, final NodeType nodeType, final long shardId,
+                                   final String... types ) {
+                if ( !isDeleted ) {
+                    writeEdgeShardStrategy.increment( scope, rowId, nodeType, shardId, 1l, types );
+                }
+            }
+
+
+            @Override
+            public void writeVersion( final MultiTennantColumnFamily<ApplicationScope, EdgeRowKey, Long> columnFamily,
+                                      final EdgeRowKey rowKey, final long timestamp ) {
+                batch.withRow( columnFamily, ScopedRowKey.fromKey( scope, rowKey ) ).putColumn( timestamp, isDeleted );
+            }
+        } );
+
+
+        return batch;
+    }
+
+
+    @Override
+    public MutationBatch deleteEdge( final EdgeColumnFamilies columnFamilies, final ApplicationScope scope,
+                                     final MarkedEdge markedEdge, final UUID timestamp ) {
+        ValidationUtils.validateApplicationScope( scope );
+        EdgeUtils.validateEdge( markedEdge );
+        ValidationUtils.verifyTimeUuid( timestamp, "timestamp" );
+
+
+        final MutationBatch batch = keyspace.prepareMutationBatch().withConsistencyLevel( cassandraConfig.getWriteCL() )
+                                            .withTimestamp( timestamp.timestamp() );
+
+
+        doWrite( columnFamilies, scope, markedEdge, new RowOp<RowKey>() {
+            @Override
+            public void writeEdge( final MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> columnFamily,
+                                   final RowKey rowKey, final DirectedEdge edge ) {
+                batch.withRow( columnFamily, ScopedRowKey.fromKey( scope, rowKey ) ).deleteColumn( edge );
+            }
+
+
+            @Override
+            public void countEdge( final Id rowId, final NodeType nodeType, final long shardId,
+                                   final String... types ) {
+                writeEdgeShardStrategy.increment( scope, rowId, nodeType, shardId, -1, types );
+            }
+
+
+            @Override
+            public void writeVersion( final MultiTennantColumnFamily<ApplicationScope, EdgeRowKey, Long> columnFamily,
+                                      final EdgeRowKey rowKey, final long timestamp ) {
+                batch.withRow( columnFamily, ScopedRowKey.fromKey( scope, rowKey ) ).deleteColumn( timestamp );
+            }
+        } );
+
+
+        return batch;
+    }
+
+
+    /**
+     * EdgeWrite the edges internally
+     *
+     * @param scope The scope to encapsulate
+     * @param edge The edge to write
+     * @param op The row operation to invoke
+     */
+    private void doWrite( final EdgeColumnFamilies columnFamilies, final ApplicationScope scope, final MarkedEdge edge,
+                          final RowOp op ) {
+        ValidationUtils.validateApplicationScope( scope );
+        EdgeUtils.validateEdge( edge );
+
+        final Id sourceNodeId = edge.getSourceNode();
+        final String sourceNodeType = sourceNodeId.getType();
+        final Id targetNodeId = edge.getTargetNode();
+        final String targetNodeType = targetNodeId.getType();
+        final long timestamp = edge.getTimestamp();
+        final String type = edge.getType();
+
+
+        /**
+         * Key in the serializers based on the edge
+         */
+
+
+        /**
+         * write edges from source->target
+         */
+
+
+        final DirectedEdge sourceEdge = new DirectedEdge( targetNodeId, timestamp );
+
+        final ShardEntries sourceRowKeyShard =
+                writeEdgeShardStrategy.getWriteShards( scope, sourceNodeId, NodeType.SOURCE, timestamp, type );
+
+        final MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> sourceCf =
+                columnFamilies.getSourceNodeCfName();
+
+
+        for ( Shard shard : sourceRowKeyShard.getEntries() ) {
+
+            final long shardId = shard.getShardIndex();
+            final RowKey sourceRowKey = new RowKey( sourceNodeId, type, shardId );
+            op.writeEdge( sourceCf, sourceRowKey, sourceEdge );
+            op.countEdge( sourceNodeId, NodeType.SOURCE, shardId, type );
+        }
+
+
+        final ShardEntries sourceWithTypeRowKeyShard = writeEdgeShardStrategy
+                .getWriteShards( scope, sourceNodeId, NodeType.SOURCE, timestamp, type, targetNodeType );
+
+        final MultiTennantColumnFamily<ApplicationScope, RowKeyType, DirectedEdge> targetCf =
+                columnFamilies.getSourceNodeTargetTypeCfName();
+
+        for ( Shard shard : sourceWithTypeRowKeyShard.getEntries() ) {
+
+            final long shardId = shard.getShardIndex();
+            final RowKeyType sourceRowKeyType = new RowKeyType( sourceNodeId, type, targetNodeId, shardId );
+
+            op.writeEdge( targetCf, sourceRowKeyType, sourceEdge );
+            op.countEdge( sourceNodeId, NodeType.SOURCE, shardId, type, targetNodeType );
+        }
+
+
+        /**
+         * write edges from target<-source
+         */
+
+        final DirectedEdge targetEdge = new DirectedEdge( sourceNodeId, timestamp );
+
+
+        final ShardEntries targetRowKeyShard =
+                writeEdgeShardStrategy.getWriteShards( scope, targetNodeId, NodeType.TARGET, timestamp, type );
+
+        final MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> sourceByTargetCf =
+                columnFamilies.getTargetNodeCfName();
+
+        for ( Shard shard : targetRowKeyShard.getEntries() ) {
+            final long shardId = shard.getShardIndex();
+            final RowKey targetRowKey = new RowKey( targetNodeId, type, shardId );
+
+            op.writeEdge( sourceByTargetCf, targetRowKey, targetEdge );
+            op.countEdge( targetNodeId, NodeType.TARGET, shardId, type );
+        }
+
+
+        final ShardEntries targetWithTypeRowKeyShard = writeEdgeShardStrategy
+                .getWriteShards( scope, targetNodeId, NodeType.TARGET, timestamp, type, sourceNodeType );
+
+        final MultiTennantColumnFamily<ApplicationScope, RowKeyType, DirectedEdge> targetBySourceCf =
+                columnFamilies.getTargetNodeSourceTypeCfName();
+
+
+        for ( Shard shard : targetWithTypeRowKeyShard.getEntries() ) {
+
+            final long shardId = shard.getShardIndex();
+
+            final RowKeyType targetRowKeyType = new RowKeyType( targetNodeId, type, sourceNodeId, shardId );
+
+
+            op.writeEdge( targetBySourceCf, targetRowKeyType, targetEdge );
+            op.countEdge( targetNodeId, NodeType.TARGET, shardId, type, sourceNodeType );
+        }
+
+        /**
+         * Always a 0l shard, we're hard limiting 2b timestamps for the same edge
+         */
+        final EdgeRowKey edgeRowKey = new EdgeRowKey( sourceNodeId, type, targetNodeId, 0l );
+
+
+        /**
+         * Write this in the timestamp log for this edge of source->target
+         */
+        op.writeVersion( columnFamilies.getGraphEdgeVersions(), edgeRowKey, timestamp );
+    }
+
+
+    @Override
+    public Iterator<MarkedEdge> getEdgeVersions( final EdgeColumnFamilies columnFamilies, final ApplicationScope scope,
+                                                 final SearchByEdge search, final Iterator<ShardEntries> shards ) {
+        ValidationUtils.validateApplicationScope( scope );
+        EdgeUtils.validateSearchByEdge( search );
+
+        final Id targetId = search.targetNode();
+        final Id sourceId = search.sourceNode();
+        final String type = search.getType();
+        final long maxTimestamp = search.getMaxTimestamp();
+        final MultiTennantColumnFamily<ApplicationScope, EdgeRowKey, Long> columnFamily =
+                columnFamilies.getGraphEdgeVersions();
+        final Serializer<Long> serializer = columnFamily.getColumnSerializer();
+
+        final EdgeSearcher<EdgeRowKey, Long, MarkedEdge> searcher =
+                new EdgeSearcher<EdgeRowKey, Long, MarkedEdge>( scope, maxTimestamp, search.last(), shards ) {
+
+                    @Override
+                    protected Serializer<Long> getSerializer() {
+                        return serializer;
+                    }
+
+
+                    @Override
+                    public void setRange( final RangeBuilder builder ) {
+
+
+                        if ( last.isPresent() ) {
+                            super.setRange( builder );
+                            return;
+                        }
+
+                        //start seeking at a value < our max version
+                        builder.setStart( maxTimestamp );
+                    }
+
+
+                    @Override
+                    protected EdgeRowKey generateRowKey( long shard ) {
+                        return new EdgeRowKey( sourceId, type, targetId, shard );
+                    }
+
+
+                    @Override
+                    protected Long getStartColumn( final Edge last ) {
+                        return last.getTimestamp();
+                    }
+
+
+                    @Override
+                    protected MarkedEdge createEdge( final Long column, final boolean marked ) {
+                        return new SimpleMarkedEdge( sourceId, type, targetId, column.longValue(), marked );
+                    }
+                };
+
+        return new ShardRowIterator<>( searcher, columnFamily, keyspace, cassandraConfig.getReadCL(),
+                graphFig.getScanPageSize() );
+    }
+
+
+    @Override
+    public Iterator<MarkedEdge> getEdgesFromSource( final EdgeColumnFamilies columnFamilies,
+                                                    final ApplicationScope scope, final SearchByEdgeType edgeType,
+                                                    final Iterator<ShardEntries> shards ) {
+
+        ValidationUtils.validateApplicationScope( scope );
+        EdgeUtils.validateSearchByEdgeType( edgeType );
+
+        final Id sourceId = edgeType.getNode();
+        final String type = edgeType.getType();
+        final long maxTimestamp = edgeType.getMaxTimestamp();
+        final MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> columnFamily =
+                columnFamilies.getSourceNodeCfName();
+        final Serializer<DirectedEdge> serializer = columnFamily.getColumnSerializer();
+
+
+        final EdgeSearcher<RowKey, DirectedEdge, MarkedEdge> searcher =
+                new EdgeSearcher<RowKey, DirectedEdge, MarkedEdge>( scope, maxTimestamp, edgeType.last(), shards ) {
+
+
+                    @Override
+                    protected Serializer<DirectedEdge> getSerializer() {
+                        return serializer;
+                    }
+
+
+                    @Override
+                    protected RowKey generateRowKey( long shard ) {
+                        return new RowKey( sourceId, type, shard );
+                    }
+
+
+                    @Override
+                    protected DirectedEdge getStartColumn( final Edge last ) {
+                        return new DirectedEdge( last.getTargetNode(), last.getTimestamp() );
+                    }
+
+
+                    @Override
+                    protected MarkedEdge createEdge( final DirectedEdge edge, final boolean marked ) {
+                        return new SimpleMarkedEdge( sourceId, type, edge.id, edge.timestamp, marked );
+                    }
+                };
+
+
+        return new ShardRowIterator<>( searcher, columnFamily, keyspace, cassandraConfig.getReadCL(),
+                graphFig.getScanPageSize() );
+    }
+
+
+    @Override
+    public Iterator<MarkedEdge> getEdgesFromSourceByTargetType( final EdgeColumnFamilies columnFamilies,
+                                                                final ApplicationScope scope,
+                                                                final SearchByIdType edgeType,
+                                                                final Iterator<ShardEntries> shards ) {
+
+        ValidationUtils.validateApplicationScope( scope );
+        EdgeUtils.validateSearchByEdgeType( edgeType );
+
+        final Id targetId = edgeType.getNode();
+        final String type = edgeType.getType();
+        final String targetType = edgeType.getIdType();
+        final long maxTimestamp = edgeType.getMaxTimestamp();
+        final MultiTennantColumnFamily<ApplicationScope, RowKeyType, DirectedEdge> columnFamily =
+                columnFamilies.getSourceNodeTargetTypeCfName();
+        final Serializer<DirectedEdge> serializer = columnFamily.getColumnSerializer();
+
+
+        final EdgeSearcher<RowKeyType, DirectedEdge, MarkedEdge> searcher =
+                new EdgeSearcher<RowKeyType, DirectedEdge, MarkedEdge>( scope, maxTimestamp, edgeType.last(), shards ) {
+
+                    @Override
+                    protected Serializer<DirectedEdge> getSerializer() {
+                        return serializer;
+                    }
+
+
+                    @Override
+                    protected RowKeyType generateRowKey( long shard ) {
+                        return new RowKeyType( targetId, type, targetType, shard );
+                    }
+
+
+                    @Override
+                    protected DirectedEdge getStartColumn( final Edge last ) {
+                        return new DirectedEdge( last.getTargetNode(), last.getTimestamp() );
+                    }
+
+
+                    @Override
+                    protected MarkedEdge createEdge( final DirectedEdge edge, final boolean marked ) {
+                        return new SimpleMarkedEdge( targetId, type, edge.id, edge.timestamp, marked );
+                    }
+                };
+
+        return new ShardRowIterator( searcher, columnFamily, keyspace, cassandraConfig.getReadCL(),
+                graphFig.getScanPageSize() );
+    }
+
+
+    @Override
+    public Iterator<MarkedEdge> getEdgesToTarget( final EdgeColumnFamilies columnFamilies, final ApplicationScope scope,
+                                                  final SearchByEdgeType edgeType,
+                                                  final Iterator<ShardEntries> shards ) {
+        ValidationUtils.validateApplicationScope( scope );
+        EdgeUtils.validateSearchByEdgeType( edgeType );
+
+        final Id targetId = edgeType.getNode();
+        final String type = edgeType.getType();
+        final long maxTimestamp = edgeType.getMaxTimestamp();
+        final MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> columnFamily =
+                columnFamilies.getTargetNodeCfName();
+        final Serializer<DirectedEdge> serializer = columnFamily.getColumnSerializer();
+
+        final EdgeSearcher<RowKey, DirectedEdge, MarkedEdge> searcher =
+                new EdgeSearcher<RowKey, DirectedEdge, MarkedEdge>( scope, maxTimestamp, edgeType.last(), shards ) {
+
+                    @Override
+                    protected Serializer<DirectedEdge> getSerializer() {
+                        return serializer;
+                    }
+
+
+                    @Override
+                    protected RowKey generateRowKey( long shard ) {
+                        return new RowKey( targetId, type, shard );
+                    }
+
+
+                    @Override
+                    protected DirectedEdge getStartColumn( final Edge last ) {
+                        return new DirectedEdge( last.getSourceNode(), last.getTimestamp() );
+                    }
+
+
+                    @Override
+                    protected MarkedEdge createEdge( final DirectedEdge edge, final boolean marked ) {
+                        return new SimpleMarkedEdge( edge.id, type, targetId, edge.timestamp, marked );
+                    }
+                };
+
+
+        return new ShardRowIterator<>( searcher, columnFamily, keyspace, cassandraConfig.getReadCL(),
+                graphFig.getScanPageSize() );
+    }
+
+
+    @Override
+    public Iterator<MarkedEdge> getEdgesToTargetBySourceType( final EdgeColumnFamilies columnFamilies,
+                                                              final ApplicationScope scope,
+                                                              final SearchByIdType edgeType,
+                                                              final Iterator<ShardEntries> shards ) {
+
+        ValidationUtils.validateApplicationScope( scope );
+        EdgeUtils.validateSearchByEdgeType( edgeType );
+
+        final Id targetId = edgeType.getNode();
+        final String sourceType = edgeType.getIdType();
+        final String type = edgeType.getType();
+        final long maxTimestamp = edgeType.getMaxTimestamp();
+        final MultiTennantColumnFamily<ApplicationScope, RowKeyType, DirectedEdge> columnFamily =
+                columnFamilies.getTargetNodeSourceTypeCfName();
+        final Serializer<DirectedEdge> serializer = columnFamily.getColumnSerializer();
+
+
+        final EdgeSearcher<RowKeyType, DirectedEdge, MarkedEdge> searcher =
+                new EdgeSearcher<RowKeyType, DirectedEdge, MarkedEdge>( scope, maxTimestamp, edgeType.last(), shards ) {
+                    @Override
+                    protected Serializer<DirectedEdge> getSerializer() {
+                        return serializer;
+                    }
+
+
+                    @Override
+                    protected RowKeyType generateRowKey( final long shard ) {
+                        return new RowKeyType( targetId, type, sourceType, shard );
+                    }
+
+
+                    @Override
+                    protected DirectedEdge getStartColumn( final Edge last ) {
+                        return new DirectedEdge( last.getTargetNode(), last.getTimestamp() );
+                    }
+
+
+                    @Override
+                    protected MarkedEdge createEdge( final DirectedEdge edge, final boolean marked ) {
+                        return new SimpleMarkedEdge( edge.id, type, targetId, edge.timestamp, marked );
+                    }
+                };
+
+        return new ShardRowIterator<>( searcher, columnFamily, keyspace, cassandraConfig.getReadCL(),
+                graphFig.getScanPageSize() );
+    }
+
+
+    /**
+     * Simple callback to perform puts and deletes with a common row setup code
+     *
+     * @param <R> The row key type
+     */
+    private static interface RowOp<R> {
+
+        /**
+         * Write the edge with the given data
+         */
+        void writeEdge( final MultiTennantColumnFamily<ApplicationScope, R, DirectedEdge> columnFamily, R rowKey,
+                        DirectedEdge edge );
+
+        /**
+         * Perform the count on the edge
+         */
+        void countEdge( final Id rowId, NodeType type, long shardId, String... types );
+
+        /**
+         * Write the edge into the version cf
+         */
+        void writeVersion( final MultiTennantColumnFamily<ApplicationScope, EdgeRowKey, Long> columnFamily,
+                           EdgeRowKey rowKey, long timestamp );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/69a3faa2/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeColumnFamilies.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeColumnFamilies.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeColumnFamilies.java
new file mode 100644
index 0000000..9050b0a
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeColumnFamilies.java
@@ -0,0 +1,150 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
+
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.marshal.DynamicCompositeType;
+
+import org.apache.usergrid.persistence.core.astyanax.ColumnTypes;
+import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
+import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
+import org.apache.usergrid.persistence.core.astyanax.OrganizationScopedRowKeySerializer;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdge;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeColumnFamilies;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeRowKey;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.RowKey;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.RowKeyType;
+
+import com.netflix.astyanax.serializers.LongSerializer;
+
+import static org.apache.usergrid.persistence.core.astyanax.ColumnTypes.LONG_TYPE_REVERSED;
+import static org.apache.usergrid.persistence.core.astyanax.ColumnTypes.UUID_TYPE_REVERSED;
+
+
+/**
+ * Implementation of size based column family
+ */
+public class SizebasedEdgeColumnFamilies implements EdgeColumnFamilies {
+
+
+    //Row key with no type
+    private static final RowSerializer ROW_SERIALIZER = new RowSerializer();
+
+    //row key with target id type
+    private static final RowTypeSerializer ROW_TYPE_SERIALIZER = new RowTypeSerializer();
+
+    private static final EdgeRowKeySerializer EDGE_ROW_KEY_SERIALIZER = new EdgeRowKeySerializer();
+
+    //Edge serializers
+    private static final EdgeSerializer EDGE_SERIALIZER = new EdgeSerializer();
+
+    private static final LongSerializer LONG_SERIALIZER = LongSerializer.get();
+
+    private static final String EDGE_DYNAMIC_COMPOSITE_TYPE =
+            //we purposefully associate lower case "l" and "u" with reversed types.  This way we can use
+            //the default serialization in Astayanax, but get reverse order in cassandra
+            DynamicCompositeType.class.getSimpleName() + "(s=>UTF8Type,l=>" + LONG_TYPE_REVERSED + ",u=>"
+                    + UUID_TYPE_REVERSED + ")";
+
+
+    //initialize the CF's from our implementation
+    private static final MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> SOURCE_NODE_EDGES =
+            new MultiTennantColumnFamily<>( "Graph_Source_Node_Edges",
+                    new OrganizationScopedRowKeySerializer<>( ROW_SERIALIZER ), EDGE_SERIALIZER );
+
+
+    private static final MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> TARGET_NODE_EDGES =
+            new MultiTennantColumnFamily<>( "Graph_Target_Node_Edges",
+                    new OrganizationScopedRowKeySerializer<>( ROW_SERIALIZER ), EDGE_SERIALIZER );
+
+
+    private static final MultiTennantColumnFamily<ApplicationScope, RowKeyType, DirectedEdge> SOURCE_NODE_TARGET_TYPE =
+            new MultiTennantColumnFamily<>( "Graph_Source_Node_Target_Type",
+                    new OrganizationScopedRowKeySerializer<>( ROW_TYPE_SERIALIZER ), EDGE_SERIALIZER );
+
+
+    /**
+     * The edges that are to the target node with the source type.  The target node is the row key
+     */
+    private static final MultiTennantColumnFamily<ApplicationScope, RowKeyType, DirectedEdge> TARGET_NODE_SOURCE_TYPE =
+            new MultiTennantColumnFamily<>( "Graph_Target_Node_Source_Type",
+                    new OrganizationScopedRowKeySerializer<>( ROW_TYPE_SERIALIZER ), EDGE_SERIALIZER );
+
+
+    private static final MultiTennantColumnFamily<ApplicationScope, EdgeRowKey, Long> EDGE_VERSIONS =
+            new MultiTennantColumnFamily<>( "Graph_Edge_Versions",
+                    new OrganizationScopedRowKeySerializer<>( EDGE_ROW_KEY_SERIALIZER ), LONG_SERIALIZER );
+
+
+    @Override
+    public MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> getSourceNodeCfName() {
+        return SOURCE_NODE_EDGES;
+    }
+
+
+    @Override
+    public MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> getTargetNodeCfName() {
+        return TARGET_NODE_EDGES;
+    }
+
+
+    @Override
+    public MultiTennantColumnFamily<ApplicationScope, RowKeyType, DirectedEdge> getSourceNodeTargetTypeCfName() {
+        return SOURCE_NODE_TARGET_TYPE;
+    }
+
+
+    @Override
+    public MultiTennantColumnFamily<ApplicationScope, RowKeyType, DirectedEdge> getTargetNodeSourceTypeCfName() {
+        return TARGET_NODE_SOURCE_TYPE;
+    }
+
+
+    @Override
+    public MultiTennantColumnFamily<ApplicationScope, EdgeRowKey, Long> getGraphEdgeVersions() {
+        return EDGE_VERSIONS;
+    }
+
+
+    @Override
+    public Collection<MultiTennantColumnFamilyDefinition> getColumnFamilies() {
+        return Arrays
+                .asList( graphCf( SOURCE_NODE_EDGES ), graphCf( TARGET_NODE_EDGES ), graphCf( SOURCE_NODE_TARGET_TYPE ),
+                        graphCf( TARGET_NODE_SOURCE_TYPE ),
+                        new MultiTennantColumnFamilyDefinition( EDGE_VERSIONS, BytesType.class.getSimpleName(),
+                                ColumnTypes.LONG_TYPE_REVERSED, BytesType.class.getSimpleName(),
+                                MultiTennantColumnFamilyDefinition.CacheOption.KEYS ) );
+    }
+
+
+    /**
+     * Helper to generate an edge definition by the type
+     */
+    private MultiTennantColumnFamilyDefinition graphCf( MultiTennantColumnFamily cf ) {
+        return new MultiTennantColumnFamilyDefinition( cf, BytesType.class.getSimpleName(), EDGE_DYNAMIC_COMPOSITE_TYPE,
+                BytesType.class.getSimpleName(), MultiTennantColumnFamilyDefinition.CacheOption.KEYS );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/69a3faa2/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeShardStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeShardStrategy.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeShardStrategy.java
index 7a55c53..a71960b 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeShardStrategy.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeShardStrategy.java
@@ -54,52 +54,23 @@ public class SizebasedEdgeShardStrategy implements EdgeShardStrategy {
 
 
     @Override
-    public ShardEntries getWriteShards( final ApplicationScope scope, final Id rowKeyId,  final NodeType nodeType, final long timestamp,
-                                final String... types ) {
+    public ShardEntries getWriteShards( final ApplicationScope scope, final Id rowKeyId, final NodeType nodeType,
+                                        final long timestamp, final String... types ) {
         return shardCache.getWriteShards( scope, rowKeyId, nodeType, timestamp, types );
     }
 
 
     @Override
-    public Iterator<ShardEntries> getReadShards( final ApplicationScope scope, final Id rowKeyId,   final NodeType nodeType,final long maxTimestamp,
-                                         final String... types ) {
+    public Iterator<ShardEntries> getReadShards( final ApplicationScope scope, final Id rowKeyId,
+                                                 final NodeType nodeType, final long maxTimestamp,
+                                                 final String... types ) {
         return shardCache.getReadShards( scope, rowKeyId, nodeType, maxTimestamp, types );
     }
 
 
     @Override
-    public void increment( final ApplicationScope scope, final Id rowKeyId,  final NodeType nodeType, final long shardId, final long count,
-                           final String... types ) {
+    public void increment( final ApplicationScope scope, final Id rowKeyId, final NodeType nodeType, final long shardId,
+                           final long count, final String... types ) {
         shardApproximation.increment( scope, rowKeyId, nodeType, shardId, count, types );
     }
-
-
-    @Override
-    public String getSourceNodeCfName() {
-        return "Graph_Source_Node_Edges";
-    }
-
-
-    @Override
-    public String getTargetNodeCfName() {
-        return "Graph_Target_Node_Edges";
-    }
-
-
-    @Override
-    public String getSourceNodeTargetTypeCfName() {
-        return "Graph_Source_Node_Target_Type";
-    }
-
-
-    @Override
-    public String getTargetNodeSourceTypeCfName() {
-        return "Graph_Target_Node_Source_Type";
-    }
-
-
-    @Override
-    public String getGraphEdgeVersions() {
-        return "Graph_Edge_Versions";
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/69a3faa2/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SourceEdgeSearcher.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SourceEdgeSearcher.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SourceEdgeSearcher.java
new file mode 100644
index 0000000..b33fcaf
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SourceEdgeSearcher.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
+
+
+public class SourceEdgeSearcher {}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/69a3faa2/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardingIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardingIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardingIT.java
index adcb42e..501cb83 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardingIT.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardingIT.java
@@ -116,6 +116,8 @@ public class GraphManagerShardingIT {
         }
 
 
+
+
         long shardCount = nodeShardApproximation.getCount( scope, sourceId, NodeType.SOURCE,  0l, edgeType );
 
         assertEquals("Shard count for source node should be the same as write count", writeCount, shardCount);