You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2014/08/28 05:36:54 UTC

[3/5] Updated OrderedMerge to use a faster implementation at runtime. After initialization, it's an O(1) emit operation as long as our produces are fast enough.

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/RowTypeSerializer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/RowTypeSerializer.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/RowTypeSerializer.java
deleted file mode 100644
index 2edea56..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/RowTypeSerializer.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one
- *  * or more contributor license agreements.  See the NOTICE file
- *  * distributed with this work for additional information
- *  * regarding copyright ownership.  The ASF licenses this file
- *  * to you under the Apache License, Version 2.0 (the
- *  * "License"); you may not use this file except in compliance
- *  * with the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing,
- *  * software distributed under the License is distributed on an
- *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  * KIND, either express or implied.  See the License for the
- *  * specific language governing permissions and limitations
- *  * under the License.
- *
- */
-
-package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
-
-
-import org.apache.usergrid.persistence.core.astyanax.CompositeFieldSerializer;
-import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.RowKeyType;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.netflix.astyanax.model.CompositeBuilder;
-import com.netflix.astyanax.model.CompositeParser;
-
-
-public class RowTypeSerializer implements CompositeFieldSerializer<RowKeyType> {
-
-    private static final IdRowCompositeSerializer ID_SER = IdRowCompositeSerializer.get();
-
-
-    @Override
-    public void toComposite( final CompositeBuilder builder, final RowKeyType keyType ) {
-
-        //add the row id to the composite
-        ID_SER.toComposite( builder, keyType.nodeId );
-
-        builder.addString( keyType.edgeType );
-        builder.addString( keyType.idType );
-
-        builder.addLong( keyType.shardId );
-    }
-
-
-    @Override
-    public RowKeyType fromComposite( final CompositeParser composite ) {
-
-        final Id id = ID_SER.fromComposite( composite );
-        final String edgeType = composite.readString();
-        final String idType = composite.readString();
-        final long shard = composite.readLong();
-
-        return new RowKeyType( id, edgeType, idType, shard);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java
index c8a884b..f1b5108 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java
@@ -6,32 +6,51 @@ import java.util.NoSuchElementException;
 
 import org.apache.commons.collections4.iterators.PushbackIterator;
 
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardGroupCompaction;
 
 import com.google.common.base.Preconditions;
 
+import rx.schedulers.Schedulers;
+
 
 /**
- * Utility class that will take an iterator of all shards, and combine them into an iterator
- * of ShardEntryGroups.  These groups can then be used in a distributed system to handle concurrent reads and writes
+ * Utility class that will take an iterator of all shards, and combine them into an iterator of ShardEntryGroups.  These
+ * groups can then be used in a distributed system to handle concurrent reads and writes
  */
 public class ShardEntryGroupIterator implements Iterator<ShardEntryGroup> {
 
 
-    private ShardEntryGroup next;
+    private final ShardGroupCompaction shardGroupCompaction;
     private final PushbackIterator<Shard> sourceIterator;
     private final long minDelta;
+    private final ApplicationScope scope;
+    private final DirectedEdgeMeta directedEdgeMeta;
+
+
+    private ShardEntryGroup next;
 
 
     /**
      * Create a shard iterator
-     * @param shardIterator The iterator of all shards.  Order is expected to be by the  shard index from Long.MAX to Long.MIN
+     *
+     * @param shardIterator The iterator of all shards.  Order is expected to be by the  shard index from Long.MAX to
+     * Long.MIN
      * @param minDelta The minimum delta we allow to consider shards the same group
      */
-    public ShardEntryGroupIterator( final Iterator<Shard> shardIterator, final long minDelta ) {
-        Preconditions.checkArgument(shardIterator.hasNext(), "Shard iterator must have shards present");
+    public ShardEntryGroupIterator( final Iterator<Shard> shardIterator, final long minDelta,
+                                    final ShardGroupCompaction shardGroupCompaction, final ApplicationScope scope,
+                                    final DirectedEdgeMeta directedEdgeMeta ) {
+
+
+        Preconditions.checkArgument( shardIterator.hasNext(), "Shard iterator must have shards present" );
+        this.scope = scope;
+        this.directedEdgeMeta = directedEdgeMeta;
         this.sourceIterator = new PushbackIterator( shardIterator );
+        this.shardGroupCompaction = shardGroupCompaction;
         this.minDelta = minDelta;
     }
 
@@ -78,7 +97,7 @@ public class ShardEntryGroupIterator implements Iterator<ShardEntryGroup> {
          */
         while ( sourceIterator.hasNext() ) {
 
-            if(next == null){
+            if ( next == null ) {
                 next = new ShardEntryGroup( minDelta );
             }
 
@@ -92,9 +111,13 @@ public class ShardEntryGroupIterator implements Iterator<ShardEntryGroup> {
 
 
             sourceIterator.pushback( shard );
+
             break;
         }
 
-
+        //now perform the audit (maybe)
+        if(next != null) {
+            shardGroupCompaction.evaluateShardGroup( scope, directedEdgeMeta, next );
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
index c566d43..5076424 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
@@ -23,21 +23,32 @@ package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
 
 
 import java.nio.charset.Charset;
-import java.util.BitSet;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.annotation.Nullable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.persistence.core.consistency.TimeService;
-import org.apache.usergrid.persistence.core.rx.ObservableIterator;
+import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.GraphFig;
 import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeColumnFamilies;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardSerialization;
@@ -47,20 +58,22 @@ import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntry
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardGroupCompaction;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardedEdgeSerialization;
 import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 
 import com.google.common.base.Preconditions;
-import com.google.common.hash.HashCode;
 import com.google.common.hash.HashFunction;
 import com.google.common.hash.Hasher;
 import com.google.common.hash.Hashing;
+import com.google.common.hash.PrimitiveSink;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
-
-import rx.Observable;
-import rx.functions.Action0;
-import rx.functions.Action1;
-import rx.functions.Func1;
-import rx.schedulers.Schedulers;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
 
 
 /**
@@ -70,27 +83,33 @@ import rx.schedulers.Schedulers;
 public class ShardGroupCompactionImpl implements ShardGroupCompaction {
 
 
+    private static final Logger LOG = LoggerFactory.getLogger( ShardGroupCompactionImpl.class );
+
+
     private static final Charset CHARSET = Charset.forName( "UTF-8" );
 
     private static final HashFunction MURMUR_128 = Hashing.murmur3_128();
 
+
+    private final ListeningExecutorService executorService;
     private final TimeService timeService;
     private final GraphFig graphFig;
     private final NodeShardAllocation nodeShardAllocation;
     private final ShardedEdgeSerialization shardedEdgeSerialization;
     private final EdgeColumnFamilies edgeColumnFamilies;
+    private final Keyspace keyspace;
     private final EdgeShardSerialization edgeShardSerialization;
 
-
     private final Random random;
     private final ShardCompactionTaskTracker shardCompactionTaskTracker;
+    private final ShardAuditTaskTracker shardAuditTaskTracker;
 
 
     @Inject
     public ShardGroupCompactionImpl( final TimeService timeService, final GraphFig graphFig,
                                      final NodeShardAllocation nodeShardAllocation,
                                      final ShardedEdgeSerialization shardedEdgeSerialization,
-                                     final EdgeColumnFamilies edgeColumnFamilies,
+                                     final EdgeColumnFamilies edgeColumnFamilies, final Keyspace keyspace,
                                      final EdgeShardSerialization edgeShardSerialization ) {
 
         this.timeService = timeService;
@@ -98,157 +117,343 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
         this.nodeShardAllocation = nodeShardAllocation;
         this.shardedEdgeSerialization = shardedEdgeSerialization;
         this.edgeColumnFamilies = edgeColumnFamilies;
+        this.keyspace = keyspace;
         this.edgeShardSerialization = edgeShardSerialization;
 
         this.random = new Random();
         this.shardCompactionTaskTracker = new ShardCompactionTaskTracker();
+        this.shardAuditTaskTracker = new ShardAuditTaskTracker();
+
+        executorService = MoreExecutors.listeningDecorator(
+                new MaxSizeThreadPool( graphFig.getShardAuditWorkerCount(), graphFig.getShardAuditWorkerQueueSize() ) );
     }
 
 
-    @Override
-    public Set<Shard> compact( final ApplicationScope scope, final DirectedEdgeMeta edgeMeta,
-                               final ShardEntryGroup group ) {
+    /**
+     * Execute the compaction task.  Will return the status the operations performed
+     *
+     * @param group The shard entry group to compact
+     *
+     * @return The result of the compaction operation
+     */
+    public CompactionResult compact( final ApplicationScope scope, final DirectedEdgeMeta edgeMeta,
+                                     final ShardEntryGroup group ) {
+
+
         final long startTime = timeService.getCurrentTime();
 
+
         Preconditions.checkNotNull( group, "group cannot be null" );
         Preconditions.checkArgument( group.isCompactionPending(), "Compaction is pending" );
-        Preconditions.checkArgument( group.shouldCompact( startTime ), "Compaction can now be run" );
+        Preconditions.checkArgument( group.shouldCompact( startTime ),
+                "Compaction cannot be run yet.  Ignoring compaction." );
 
-        /**
-         * It's already compacting, don't do anything
-         */
-        if (!shardCompactionTaskTracker.shouldStartCompaction( scope, edgeMeta, group )){
-            return Collections.emptySet();
-        }
 
+        final CompactionResult.CompactionBuilder resultBuilder = CompactionResult.builder();
 
         final Shard targetShard = group.getCompactionTarget();
 
-        final Collection<Shard> sourceShards = group.getReadShards();
+        final Set<Shard> sourceShards = new HashSet<>( group.getReadShards() );
 
+        //remove the target
+        sourceShards.remove( targetShard );
 
 
-        Observable.create( new ObservableIterator<MarkedEdge>( "Shard_Repair" ) {
-            @Override
-            protected Iterator<MarkedEdge> getIterator() {
-                return edgeMeta.loadEdges( shardedEdgeSerialization, edgeColumnFamilies, scope, group.getReadShards(), Long.MAX_VALUE );
-            }
-        } ).buffer( graphFig.getScanPageSize() ).doOnNext( new Action1<List<MarkedEdge>>() {
-            @Override
-            public void call( final List<MarkedEdge> markedEdges ) {
+        final UUID timestamp = UUIDGenerator.newTimeUUID();
 
-            }
-        }).doOnNext( new Action1<List<MarkedEdge>>() {
-            @Override
-            public void call( final List<MarkedEdge> markedEdges ) {
+        final long newShardPivot = targetShard.getShardIndex();
 
-            }
-        } );
+        final int maxWorkSize = graphFig.getScanPageSize();
 
 
+        final MutationBatch newRowBatch = keyspace.prepareMutationBatch();
+        final MutationBatch deleteRowBatch = keyspace.prepareMutationBatch();
 
+        /**
+         * As we move edges, we want to keep track of it
+         */
+        long edgeCount = 0;
 
 
-        return null;
-    }
+        for ( Shard sourceShard : sourceShards ) {
+            Iterator<MarkedEdge> edges = edgeMeta.loadEdges( shardedEdgeSerialization, edgeColumnFamilies, scope,
+                    Collections.singleton( sourceShard ), Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING );
 
+            while ( edges.hasNext() ) {
+                final MarkedEdge edge = edges.next();
 
-    @Override
-    public AuditResult evaluateShardGroup( final ApplicationScope scope, final DirectedEdgeMeta edgeMeta,
-                                           final ShardEntryGroup group ) {
+                final long edgeTimestamp = edge.getTimestamp();
 
+                /**
+                 * The edge is within a different shard, break
+                 */
+                if ( edgeTimestamp < newShardPivot ) {
+                    break;
+                }
 
-        final double repairChance = random.nextDouble();
 
-        //don't repair
-        if ( repairChance > graphFig.getShardRepairChance() ) {
-            return AuditResult.NOT_CHECKED;
+                newRowBatch.mergeShallow(
+                        edgeMeta.writeEdge( shardedEdgeSerialization, edgeColumnFamilies, scope, targetShard, edge,
+                                timestamp ) );
+
+                deleteRowBatch.mergeShallow(
+                        edgeMeta.deleteEdge( shardedEdgeSerialization, edgeColumnFamilies, scope, sourceShard, edge,
+                                timestamp ) );
+
+                edgeCount++;
+
+                //if we're at our count, execute the mutation of writing the edges to the new row, then remove them
+                //from the old rows
+                if ( edgeCount % maxWorkSize == 0 ) {
+
+                    try {
+                        HystrixCassandra.async( newRowBatch );
+                        HystrixCassandra.async( deleteRowBatch );
+                    }
+                    catch ( Throwable t ) {
+                        LOG.error( "Unable to move edges from shard {} to shard {}", sourceShard, targetShard );
+                    }
+                }
+            }
         }
 
 
+        try {
+            HystrixCassandra.async( newRowBatch );
+            HystrixCassandra.async( deleteRowBatch );
+        }
+        catch ( Throwable t ) {
+            LOG.error( "Unable to move edges to target shard {}", targetShard );
+        }
+
+
+        LOG.info( "Finished compacting {} shards and moved {} edges", sourceShards, edgeCount );
+
+        resultBuilder.withCopiedEdges( edgeCount ).withSourceShards( sourceShards ).withTargetShard( targetShard );
+
         /**
-         * We don't have a compaction pending.  Run an audit on the shards
+         * We didn't move anything this pass, mark the shard as compacted.  If we move something, it means that we missed it on the first pass
+         * or someone is still not writing to the target shard only.
          */
-        if ( !group.isCompactionPending() ) {
+        if ( edgeCount == 0 ) {
 
-            /**
-             * Check if we should allocate, we may want to
-             */
 
+            //now that we've marked our target as compacted, we can successfully remove any shards that are not
+            // compacted themselves in the sources
+
+            final MutationBatch shardRemovalRollup = keyspace.prepareMutationBatch();
+
+            for ( Shard source : sourceShards ) {
+
+                //if we can't safely delete it, don't do so
+                if ( !group.canBeDeleted( source ) ) {
+                    continue;
+                }
 
-            final boolean created = nodeShardAllocation.auditShard( scope, group, edgeMeta );
+                LOG.info( "Source shards have been fully drained.  Removing shard {}", source );
 
+                final MutationBatch shardRemoval = edgeShardSerialization.removeShardMeta( scope, source, edgeMeta );
+                shardRemovalRollup.mergeShallow( shardRemoval );
 
-            if ( !created ) {
-                return AuditResult.CHECKED_NO_OP;
+                resultBuilder.withRemovedShard( source );
             }
 
 
-            return AuditResult.CHECKED_CREATED;
+            HystrixCassandra.async( shardRemovalRollup );
+
+
+            LOG.info( "Shard has been fully compacted.  Marking shard {} as compacted in Cassandra", targetShard );
+
+            //Overwrite our shard index with a newly created one that has been marked as compacted
+            Shard compactedShard = new Shard( targetShard.getShardIndex(), timeService.getCurrentTime(), true );
+            final MutationBatch updateMark = edgeShardSerialization.writeShardMeta( scope, compactedShard, edgeMeta );
+            HystrixCassandra.async( updateMark );
+
+            resultBuilder.withCompactedShard( compactedShard );
         }
 
-        //check our taskmanager
+        return resultBuilder.build();
+    }
 
 
+    @Override
+    public ListenableFuture<AuditResult> evaluateShardGroup( final ApplicationScope scope,
+                                                             final DirectedEdgeMeta edgeMeta,
+                                                             final ShardEntryGroup group ) {
+
+        final double repairChance = random.nextDouble();
+
+
+        //don't audit, we didn't hit our chance
+        if ( repairChance > graphFig.getShardRepairChance() ) {
+            return Futures.immediateFuture( AuditResult.NOT_CHECKED );
+        }
+
         /**
-         * Do the compaction
+         * Try and submit.  During back pressure, we may not be able to submit, that's ok.  Better to drop than to
+         * hose the system
          */
-        if ( group.shouldCompact( timeService.getCurrentTime() ) ) {
-            compact( scope, edgeMeta, group );
-            return AuditResult.COMPACTING;
-        }
+        ListenableFuture<AuditResult> future = executorService.submit( new Callable<AuditResult>() {
+            @Override
+            public AuditResult call() throws Exception {
+
+
+                /**
+                 * We don't have a compaction pending.  Run an audit on the shards
+                 */
+                if ( !group.isCompactionPending() ) {
+
+                    /**
+                     * Check if we should allocate, we may want to
+                     */
+
+                    /**
+                     * It's already compacting, don't do anything
+                     */
+                    if ( !shardAuditTaskTracker.canStartTask( scope, edgeMeta, group ) ) {
+                        return AuditResult.CHECKED_NO_OP;
+                    }
+
+                    try {
+
+                        final boolean created = nodeShardAllocation.auditShard( scope, group, edgeMeta );
+                        if ( !created ) {
+                            return AuditResult.CHECKED_NO_OP;
+                        }
+                    }
+                    finally {
+                        shardAuditTaskTracker.complete( scope, edgeMeta, group );
+                    }
+
+
+                    return AuditResult.CHECKED_CREATED;
+                }
+
+                //check our taskmanager
+
+
+                /**
+                 * Do the compaction
+                 */
+                if ( group.shouldCompact( timeService.getCurrentTime() ) ) {
+                    /**
+                     * It's already compacting, don't do anything
+                     */
+                    if ( !shardCompactionTaskTracker.canStartTask( scope, edgeMeta, group ) ) {
+                        return AuditResult.COMPACTING;
+                    }
+
+                    /**
+                     * We use a finally b/c we always want to remove the task track
+                     */
+                    try {
+                        CompactionResult result = compact( scope, edgeMeta, group );
+                        LOG.info( "Compaction result for compaction of scope {} with edge meta data of {} and shard group {} is {}", new Object[]{scope, edgeMeta, group, result} );
+                    }
+                    finally {
+                        shardCompactionTaskTracker.complete( scope, edgeMeta, group );
+                    }
+                    return AuditResult.COMPACTED;
+                }
+
+                //no op, there's nothing we need to do to this shard
+                return AuditResult.NOT_CHECKED;
+            }
+        } );
+
+        /**
+         * Log our success or failures for debugging purposes
+         */
+        Futures.addCallback( future, new FutureCallback<AuditResult>() {
+            @Override
+            public void onSuccess( @Nullable final AuditResult result ) {
+                LOG.debug( "Successfully completed audit of task {}", result );
+            }
 
-        //no op, there's nothing we need to do to this shard
-        return AuditResult.NOT_CHECKED;
+
+            @Override
+            public void onFailure( final Throwable t ) {
+                LOG.error( "Unable to perform audit.  Exception is ", t );
+            }
+        } );
+
+        return future;
     }
 
 
-    private static final class ShardCompactionTaskTracker {
-        private BitSet runningTasks = new BitSet();
+    /**
+     * Inner class used to track running tasks per instance
+     */
+    private static abstract class TaskTracker {
+
+        private static final Boolean TRUE = true;
+
+        private ConcurrentHashMap<Long, Boolean> runningTasks = new ConcurrentHashMap<>();
 
 
         /**
          * Sets this data into our scope to signal it's running to stop other threads from attempting to run
-         * @param scope
-         * @param edgeMeta
-         * @param group
-         * @return
          */
-        public boolean shouldStartCompaction( final ApplicationScope scope, final DirectedEdgeMeta edgeMeta,
+        public boolean canStartTask( final ApplicationScope scope, final DirectedEdgeMeta edgeMeta,
                                      ShardEntryGroup group ) {
-            final int hash = doHash( scope, edgeMeta, group ).asInt();
+            final Long hash = doHash( scope, edgeMeta, group ).hash().asLong();
 
-            if(runningTasks.get( hash )){
-                return false;
-            }
-
-            runningTasks.set( hash );
+            final Boolean returned = runningTasks.putIfAbsent( hash, TRUE );
 
-            return true;
+            /**
+             * Someone already put the value
+             */
+            return returned == null;
         }
 
 
         /**
          * Mark this entry group as complete
-         * @param scope
-         * @param edgeMeta
-         * @param group
          */
-        public void complete( final ApplicationScope scope, final DirectedEdgeMeta edgeMeta,
-                                             ShardEntryGroup group ) {
-            final int hash = doHash( scope, edgeMeta, group ).asInt();
-            runningTasks.clear( hash );
+        public void complete( final ApplicationScope scope, final DirectedEdgeMeta edgeMeta, ShardEntryGroup group ) {
+            final long hash = doHash( scope, edgeMeta, group ).hash().asLong();
+            runningTasks.remove( hash );
         }
 
 
+        protected abstract Hasher doHash( final ApplicationScope scope, final DirectedEdgeMeta directedEdgeMeta,
+                                          final ShardEntryGroup shardEntryGroup );
+    }
+
+
+    /**
+     * Task tracker for shard compaction
+     */
+    private static final class ShardCompactionTaskTracker extends ShardAuditTaskTracker {
+
         /**
          * Hash our data into a consistent long
-         * @param scope
-         * @param directedEdgeMeta
-         * @param shardEntryGroup
-         * @return
          */
-        private HashCode doHash( final ApplicationScope scope, final DirectedEdgeMeta directedEdgeMeta,
+        protected Hasher doHash( final ApplicationScope scope, final DirectedEdgeMeta directedEdgeMeta,
+                                 final ShardEntryGroup shardEntryGroup ) {
+
+            final Hasher hasher = super.doHash( scope, directedEdgeMeta, shardEntryGroup );
+
+            //add our compaction target to the hash
+            final Shard compactionTarget = shardEntryGroup.getCompactionTarget();
+
+            hasher.putLong( compactionTarget.getShardIndex() );
+
+
+            return hasher;
+        }
+    }
+
+
+    /**
+     * Task tracker for shard audit
+     */
+    private static class ShardAuditTaskTracker extends TaskTracker {
+
+        /**
+         * Hash our data into a consistent long
+         */
+        protected Hasher doHash( final ApplicationScope scope, final DirectedEdgeMeta directedEdgeMeta,
                                  final ShardEntryGroup shardEntryGroup ) {
 
             final Hasher hasher = MURMUR_128.newHasher();
@@ -273,35 +478,141 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
             }
 
             //add our compaction target to the hash
-            final Shard compactionTarget = shardEntryGroup.getCompactionTarget();
 
-            hasher.putLong( compactionTarget.getShardIndex() );
 
-
-            return hasher.hash();
+            return hasher;
         }
 
 
-        private void addToHash( final Hasher hasher, final Id id ) {
+        protected void addToHash( final PrimitiveSink into, final Id id ) {
 
             final UUID nodeUuid = id.getUuid();
             final String nodeType = id.getType();
 
-            hasher.putLong( nodeUuid.getMostSignificantBits() ).putLong( nodeUuid.getLeastSignificantBits() )
-                  .putString( nodeType, CHARSET );
+            into.putLong( nodeUuid.getMostSignificantBits() ).putLong( nodeUuid.getLeastSignificantBits() )
+                .putString( nodeType, CHARSET );
+        }
+    }
+
+
+    /**
+     * Create a thread pool that will reject work if our audit tasks become overwhelmed
+     */
+    private final class MaxSizeThreadPool extends ThreadPoolExecutor {
+
+        public MaxSizeThreadPool( final int workerSize, final int queueLength ) {
+            super( 1, workerSize, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>( queueLength ),
+                    new CompactionThreadFactory(), new RejectionLogger() );
+        }
+    }
+
+
+    private final class CompactionThreadFactory implements ThreadFactory {
+
+        private final AtomicLong threadCounter = new AtomicLong();
+
+
+        @Override
+        public Thread newThread( final Runnable r ) {
+            final long newValue = threadCounter.incrementAndGet();
+
+            return new Thread( r, "Graph-Shard-Compaction-" + newValue );
         }
     }
 
-    private enum StartResult{
-        /**
-         * Returned if the compaction was started
-         */
 
-        STARTED,
+    private final class RejectionLogger implements RejectedExecutionHandler {
+
+
+        @Override
+        public void rejectedExecution( final Runnable r, final ThreadPoolExecutor executor ) {
+            LOG.warn( "Audit queue full, rejecting audit task {}", r );
+        }
+    }
+
+
+
+    public static final class CompactionResult {
+
+        public final long copiedEdges;
+        public final Shard targetShard;
+        public final Set<Shard> sourceShards;
+        public final Set<Shard> removedShards;
+        public final Shard compactedShard;
+
+
+
+        private CompactionResult( final long copiedEdges, final Shard targetShard, final Set<Shard> sourceShards,
+                                  final Set<Shard> removedShards, final Shard compactedShard ) {
+            this.copiedEdges = copiedEdges;
+            this.targetShard = targetShard;
+            this.compactedShard = compactedShard;
+            this.sourceShards = Collections.unmodifiableSet( sourceShards );
+            this.removedShards = Collections.unmodifiableSet( removedShards );
+        }
+
 
         /**
-         * Returned if we are running the compaction
+         * Create a builder to use to create the result
          */
-        RUNNING;
+        public static CompactionBuilder builder() {
+            return new CompactionBuilder();
+        }
+
+
+        @Override
+        public String toString() {
+            return "CompactionResult{" +
+                    "copiedEdges=" + copiedEdges +
+                    ", targetShard=" + targetShard +
+                    ", sourceShards=" + sourceShards +
+                    ", removedShards=" + removedShards +
+                    ", compactedShard=" + compactedShard +
+                    '}';
+        }
+
+
+        public static final class CompactionBuilder {
+            private long copiedEdges;
+            private Shard targetShard;
+            private Set<Shard> sourceShards;
+            private Set<Shard> removedShards = new HashSet<>();
+            private Shard compactedShard;
+
+
+            public CompactionBuilder withCopiedEdges( final long copiedEdges ) {
+                this.copiedEdges = copiedEdges;
+                return this;
+            }
+
+
+            public CompactionBuilder withTargetShard( final Shard targetShard ) {
+                this.targetShard = targetShard;
+                return this;
+            }
+
+
+            public CompactionBuilder withSourceShards( final Set<Shard> sourceShards ) {
+                this.sourceShards = sourceShards;
+                return this;
+            }
+
+
+            public CompactionBuilder withRemovedShard( final Shard removedShard ) {
+                this.removedShards.add( removedShard );
+                return this;
+            }
+
+
+            public CompactionBuilder withCompactedShard( final Shard compactedShard ) {
+                this.compactedShard = compactedShard;
+                return this;
+            }
+
+
+            public CompactionResult build() {
+                return new CompactionResult( copiedEdges, targetShard, sourceShards, removedShards, compactedShard );
+            }
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
index 030e4a7..b0523e1 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
@@ -22,9 +22,11 @@ package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
 
 
 import java.util.Collection;
+import java.util.Comparator;
 import java.util.Iterator;
 import java.util.UUID;
 
+import javax.annotation.Nullable;
 import javax.inject.Inject;
 
 import org.apache.usergrid.persistence.core.astyanax.CassandraConfig;
@@ -49,9 +51,16 @@ import org.apache.usergrid.persistence.graph.serialization.impl.shard.RowKey;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.RowKeyType;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardedEdgeSerialization;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.comparators.DescendingTimestampComparator;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.comparators.OrderedComparator;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.comparators
+        .SourceDirectedEdgeDescendingComparator;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.comparators
+        .TargetDirectedEdgeDescendingComparator;
 import org.apache.usergrid.persistence.graph.serialization.util.GraphValidation;
 import org.apache.usergrid.persistence.model.entity.Id;
 
+import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.inject.Singleton;
 import com.netflix.astyanax.Keyspace;
@@ -256,6 +265,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
                             final Shard shard, final boolean isDeleted ) {
 
                 batch.withRow( columnFamily, ScopedRowKey.fromKey( scope, rowKey ) ).deleteColumn( edge );
+                writeEdgeShardStrategy.increment( scope, shard, -1, directedEdgeMeta );
             }
         }.createBatch( scope, shards, timestamp );
     }
@@ -278,6 +288,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
 
                 batch.withRow( columnFamilies.getSourceNodeTargetTypeCfName(), ScopedRowKey.fromKey( scope, rowKey ) )
                      .deleteColumn( edge );
+                writeEdgeShardStrategy.increment( scope, shard, -1, directedEdgeMeta );
             }
         }.createBatch( scope, shards, timestamp );
     }
@@ -297,6 +308,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
                             final Shard shard, final boolean isDeleted ) {
 
                 batch.withRow( columnFamily, ScopedRowKey.fromKey( scope, rowKey ) ).deleteColumn( edge );
+                writeEdgeShardStrategy.increment( scope, shard, -1, directedEdgeMeta );
             }
         }.createBatch( scope, shards, timestamp );
     }
@@ -319,6 +331,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
 
                 batch.withRow( columnFamilies.getTargetNodeSourceTypeCfName(), ScopedRowKey.fromKey( scope, rowKey ) )
                      .deleteColumn( edge );
+                writeEdgeShardStrategy.increment( scope, shard, -1, directedEdgeMeta );
             }
         }.createBatch( scope, shards, timestamp );
     }
@@ -338,6 +351,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
                             final boolean isDeleted ) {
                 batch.withRow( columnFamilies.getGraphEdgeVersions(), ScopedRowKey.fromKey( scope, rowKey ) )
                      .deleteColumn( column );
+                writeEdgeShardStrategy.increment( scope, shard, -1, directedEdgeMeta );
             }
         }.createBatch( scope, shards, timestamp );
     }
@@ -357,8 +371,16 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
                 columnFamilies.getGraphEdgeVersions();
         final Serializer<Long> serializer = columnFamily.getColumnSerializer();
 
+
+        final OrderedComparator<MarkedEdge> comparator = new OrderedComparator<>( DescendingTimestampComparator.INSTANCE, search.getOrder());
+
+
+
+
         final EdgeSearcher<EdgeRowKey, Long, MarkedEdge> searcher =
-                new EdgeSearcher<EdgeRowKey, Long, MarkedEdge>( scope, maxTimestamp, search.last(), shards ) {
+                new EdgeSearcher<EdgeRowKey, Long, MarkedEdge>( scope, shards, search.getOrder(),  comparator, maxTimestamp,
+                        search.last().transform( TRANSFORM ) ) {
+
 
                     @Override
                     protected Serializer<Long> getSerializer() {
@@ -367,11 +389,11 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
 
 
                     @Override
-                    public void setRange( final RangeBuilder builder ) {
+                    public void buildRange( final RangeBuilder builder ) {
 
 
                         if ( last.isPresent() ) {
-                            super.setRange( builder );
+                            super.buildRange( builder );
                             return;
                         }
 
@@ -387,7 +409,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
 
 
                     @Override
-                    protected Long getStartColumn( final Edge last ) {
+                    protected Long createColumn( final MarkedEdge last ) {
                         return last.getTimestamp();
                     }
 
@@ -398,10 +420,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
                     }
 
 
-                    @Override
-                    public int compare( final MarkedEdge o1, final MarkedEdge o2 ) {
-                        return Long.compare( o1.getTimestamp(), o2.getTimestamp() );
-                    }
+
                 };
 
         return new ShardsColumnIterator<>( searcher, columnFamily, keyspace, cassandraConfig.getReadCL(),
@@ -411,23 +430,27 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
 
     @Override
     public Iterator<MarkedEdge> getEdgesFromSource( final EdgeColumnFamilies columnFamilies,
-                                                    final ApplicationScope scope, final SearchByEdgeType edgeType,
+                                                    final ApplicationScope scope, final SearchByEdgeType search,
                                                     final Collection<Shard> shards ) {
 
         ValidationUtils.validateApplicationScope( scope );
-        GraphValidation.validateSearchByEdgeType( edgeType );
+        GraphValidation.validateSearchByEdgeType( search );
 
-        final Id sourceId = edgeType.getNode();
-        final String type = edgeType.getType();
-        final long maxTimestamp = edgeType.getMaxTimestamp();
+        final Id sourceId = search.getNode();
+        final String type = search.getType();
+        final long maxTimestamp = search.getMaxTimestamp();
         final MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> columnFamily =
                 columnFamilies.getSourceNodeCfName();
         final Serializer<DirectedEdge> serializer = columnFamily.getColumnSerializer();
 
 
-        final SourceEdgeSearcher<RowKey, DirectedEdge, MarkedEdge> searcher =
-                new SourceEdgeSearcher<RowKey, DirectedEdge, MarkedEdge>( scope, maxTimestamp, edgeType.last(),
-                        shards ) {
+        final OrderedComparator<MarkedEdge> comparator = new OrderedComparator<>( TargetDirectedEdgeDescendingComparator.INSTANCE, search.getOrder());
+
+
+
+        final EdgeSearcher<RowKey, DirectedEdge, MarkedEdge> searcher =
+                new EdgeSearcher<RowKey, DirectedEdge, MarkedEdge>( scope, shards, search.getOrder(), comparator, maxTimestamp,
+                        search.last().transform( TRANSFORM ) ) {
 
 
                     @Override
@@ -443,7 +466,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
 
 
                     @Override
-                    protected DirectedEdge getStartColumn( final Edge last ) {
+                    protected DirectedEdge createColumn( final MarkedEdge last ) {
                         return new DirectedEdge( last.getTargetNode(), last.getTimestamp() );
                     }
 
@@ -463,24 +486,26 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
     @Override
     public Iterator<MarkedEdge> getEdgesFromSourceByTargetType( final EdgeColumnFamilies columnFamilies,
                                                                 final ApplicationScope scope,
-                                                                final SearchByIdType edgeType,
+                                                                final SearchByIdType search,
                                                                 final Collection<Shard> shards ) {
 
         ValidationUtils.validateApplicationScope( scope );
-        GraphValidation.validateSearchByEdgeType( edgeType );
+        GraphValidation.validateSearchByEdgeType( search );
 
-        final Id targetId = edgeType.getNode();
-        final String type = edgeType.getType();
-        final String targetType = edgeType.getIdType();
-        final long maxTimestamp = edgeType.getMaxTimestamp();
+        final Id targetId = search.getNode();
+        final String type = search.getType();
+        final String targetType = search.getIdType();
+        final long maxTimestamp = search.getMaxTimestamp();
         final MultiTennantColumnFamily<ApplicationScope, RowKeyType, DirectedEdge> columnFamily =
                 columnFamilies.getSourceNodeTargetTypeCfName();
         final Serializer<DirectedEdge> serializer = columnFamily.getColumnSerializer();
 
+        final OrderedComparator<MarkedEdge> comparator = new OrderedComparator<>( TargetDirectedEdgeDescendingComparator.INSTANCE, search.getOrder());
+
 
-        final SourceEdgeSearcher<RowKeyType, DirectedEdge, MarkedEdge> searcher =
-                new SourceEdgeSearcher<RowKeyType, DirectedEdge, MarkedEdge>( scope, maxTimestamp, edgeType.last(),
-                        shards ) {
+        final EdgeSearcher<RowKeyType, DirectedEdge, MarkedEdge> searcher =
+                new EdgeSearcher<RowKeyType, DirectedEdge, MarkedEdge>( scope, shards, search.getOrder(), comparator, maxTimestamp,
+                        search.last().transform( TRANSFORM ) ) {
 
                     @Override
                     protected Serializer<DirectedEdge> getSerializer() {
@@ -495,7 +520,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
 
 
                     @Override
-                    protected DirectedEdge getStartColumn( final Edge last ) {
+                    protected DirectedEdge createColumn( final MarkedEdge last ) {
                         return new DirectedEdge( last.getTargetNode(), last.getTimestamp() );
                     }
 
@@ -513,20 +538,22 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
 
     @Override
     public Iterator<MarkedEdge> getEdgesToTarget( final EdgeColumnFamilies columnFamilies, final ApplicationScope scope,
-                                                  final SearchByEdgeType edgeType, final Collection<Shard> shards ) {
+                                                  final SearchByEdgeType search, final Collection<Shard> shards ) {
         ValidationUtils.validateApplicationScope( scope );
-        GraphValidation.validateSearchByEdgeType( edgeType );
+        GraphValidation.validateSearchByEdgeType( search );
 
-        final Id targetId = edgeType.getNode();
-        final String type = edgeType.getType();
-        final long maxTimestamp = edgeType.getMaxTimestamp();
+        final Id targetId = search.getNode();
+        final String type = search.getType();
+        final long maxTimestamp = search.getMaxTimestamp();
         final MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> columnFamily =
                 columnFamilies.getTargetNodeCfName();
         final Serializer<DirectedEdge> serializer = columnFamily.getColumnSerializer();
 
-        final TargetEdgeSearcher<RowKey, DirectedEdge, MarkedEdge> searcher =
-                new TargetEdgeSearcher<RowKey, DirectedEdge, MarkedEdge>( scope, maxTimestamp, edgeType.last(),
-                        shards ) {
+        final OrderedComparator<MarkedEdge> comparator = new OrderedComparator<>( SourceDirectedEdgeDescendingComparator.INSTANCE, search.getOrder());
+
+        final EdgeSearcher<RowKey, DirectedEdge, MarkedEdge> searcher =
+                new EdgeSearcher<RowKey, DirectedEdge, MarkedEdge>( scope, shards, search.getOrder(),comparator,  maxTimestamp,
+                        search.last().transform( TRANSFORM ) ) {
 
                     @Override
                     protected Serializer<DirectedEdge> getSerializer() {
@@ -541,7 +568,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
 
 
                     @Override
-                    protected DirectedEdge getStartColumn( final Edge last ) {
+                    protected DirectedEdge createColumn( final MarkedEdge last ) {
                         return new DirectedEdge( last.getSourceNode(), last.getTimestamp() );
                     }
 
@@ -561,24 +588,26 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
     @Override
     public Iterator<MarkedEdge> getEdgesToTargetBySourceType( final EdgeColumnFamilies columnFamilies,
                                                               final ApplicationScope scope,
-                                                              final SearchByIdType edgeType,
+                                                              final SearchByIdType search,
                                                               final Collection<Shard> shards ) {
 
         ValidationUtils.validateApplicationScope( scope );
-        GraphValidation.validateSearchByEdgeType( edgeType );
+        GraphValidation.validateSearchByEdgeType( search );
 
-        final Id targetId = edgeType.getNode();
-        final String sourceType = edgeType.getIdType();
-        final String type = edgeType.getType();
-        final long maxTimestamp = edgeType.getMaxTimestamp();
+        final Id targetId = search.getNode();
+        final String sourceType = search.getIdType();
+        final String type = search.getType();
+        final long maxTimestamp = search.getMaxTimestamp();
         final MultiTennantColumnFamily<ApplicationScope, RowKeyType, DirectedEdge> columnFamily =
                 columnFamilies.getTargetNodeSourceTypeCfName();
         final Serializer<DirectedEdge> serializer = columnFamily.getColumnSerializer();
 
+        final OrderedComparator<MarkedEdge> comparator = new OrderedComparator<>( SourceDirectedEdgeDescendingComparator.INSTANCE, search.getOrder());
+
 
-        final TargetEdgeSearcher<RowKeyType, DirectedEdge, MarkedEdge> searcher =
-                new TargetEdgeSearcher<RowKeyType, DirectedEdge, MarkedEdge>( scope, maxTimestamp, edgeType.last(),
-                        shards ) {
+        final EdgeSearcher<RowKeyType, DirectedEdge, MarkedEdge> searcher =
+                new EdgeSearcher<RowKeyType, DirectedEdge, MarkedEdge>( scope, shards, search.getOrder(), comparator, maxTimestamp,
+                        search.last().transform( TRANSFORM ) ) {
                     @Override
                     protected Serializer<DirectedEdge> getSerializer() {
                         return serializer;
@@ -592,7 +621,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
 
 
                     @Override
-                    protected DirectedEdge getStartColumn( final Edge last ) {
+                    protected DirectedEdge createColumn( final MarkedEdge last ) {
                         return new DirectedEdge( last.getTargetNode(), last.getTimestamp() );
                     }
 
@@ -608,51 +637,8 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
     }
 
 
-    /**
-     * Class for performing searched on rows based on source id
-     */
-    private static abstract class SourceEdgeSearcher<R, C, T extends Edge> extends EdgeSearcher<R, C, T> {
-
-        protected SourceEdgeSearcher( final ApplicationScope scope, final long maxTimestamp, final Optional<Edge> last,
-                                      final Collection<Shard> shards ) {
-            super( scope, maxTimestamp, last, shards );
-        }
 
 
-        public int compare( final T o1, final T o2 ) {
-            int compare = Long.compare( o1.getTimestamp(), o2.getTimestamp() );
-
-            if ( compare == 0 ) {
-                compare = o1.getTargetNode().compareTo( o2.getTargetNode() );
-            }
-
-            return compare;
-        }
-    }
-
-
-    /**
-     * Class for performing searched on rows based on target id
-     */
-    private static abstract class TargetEdgeSearcher<R, C, T extends Edge> extends EdgeSearcher<R, C, T> {
-
-        protected TargetEdgeSearcher( final ApplicationScope scope, final long maxTimestamp, final Optional<Edge> last,
-                                      final Collection<Shard> shards ) {
-            super( scope, maxTimestamp, last, shards );
-        }
-
-
-        public int compare( final T o1, final T o2 ) {
-            int compare = Long.compare( o1.getTimestamp(), o2.getTimestamp() );
-
-            if ( compare == 0 ) {
-                compare = o1.getTargetNode().compareTo( o2.getTargetNode() );
-            }
-
-            return compare;
-        }
-    }
-
 
     /**
      * Simple callback to perform puts and deletes with a common row setup code
@@ -994,4 +980,27 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
             return isDeleted;
         }
     }
+
+
+
+
+
+
+    private static final Function<Edge, MarkedEdge> TRANSFORM = new Function<Edge, MarkedEdge>() {
+        @Nullable
+        @Override
+        public MarkedEdge apply( @Nullable final Edge input ) {
+
+            if ( input == null ) {
+                return null;
+            }
+
+            if ( input instanceof MarkedEdge ) {
+                return ( MarkedEdge ) input;
+            }
+
+            return new SimpleMarkedEdge( input.getSourceNode(), input.getType(), input.getTargetNode(),
+                    input.getTimestamp(), false );
+        }
+    };
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java
index 0c7e5b5..d99d98b 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java
@@ -8,6 +8,7 @@ import java.util.NoSuchElementException;
 
 import org.apache.usergrid.persistence.core.astyanax.ColumnNameIterator;
 import org.apache.usergrid.persistence.core.astyanax.MultiKeyColumnNameIterator;
+import org.apache.usergrid.persistence.core.astyanax.MultiRowColumnIterator;
 import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
 import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
@@ -86,6 +87,7 @@ public class ShardsColumnIterator<R, C, T> implements Iterator<T> {
      */
     private void startIterator() {
 
+
         /**
          * If the edge is present, we need to being seeking from this
          */
@@ -94,7 +96,9 @@ public class ShardsColumnIterator<R, C, T> implements Iterator<T> {
 
 
         //set the range into the search
-        searcher.setRange( rangeBuilder );
+        searcher.buildRange( rangeBuilder );
+
+
 
         /**
          * Get our list of slices
@@ -102,27 +106,22 @@ public class ShardsColumnIterator<R, C, T> implements Iterator<T> {
         final List<ScopedRowKey<ApplicationScope, R>> rowKeys = searcher.getRowKeys();
 
 
-        final List<ColumnNameIterator<C, T>> columnNameIterators = new ArrayList<>( rowKeys.size() );
-
-        for(ScopedRowKey<ApplicationScope, R> rowKey: rowKeys){
-
+        if(rowKeys.size() == 1){
 
+            final  RowQuery<ScopedRowKey<ApplicationScope, R>, C> query =
+                           keyspace.prepareQuery( cf ).setConsistencyLevel( consistencyLevel ).getKey( rowKeys.get( 0 ) )
+                                   .autoPaginate( true ).withColumnRange( rangeBuilder.build() );
 
-           final  RowQuery<ScopedRowKey<ApplicationScope, R>, C> query =
-                    keyspace.prepareQuery( cf ).setConsistencyLevel( consistencyLevel ).getKey( rowKey )
-                            .autoPaginate( true ).withColumnRange( rangeBuilder.build() );
-
-
-            final ColumnNameIterator<C, T> columnNameIterator = new ColumnNameIterator<>( query, searcher, searcher.hasPage() );
+            currentColumnIterator = new ColumnNameIterator<>( query, searcher, searcher.hasPage() );
+        }
 
-            columnNameIterators.add( columnNameIterator );
+        else{
 
+            currentColumnIterator = new MultiRowColumnIterator<>( keyspace, cf,  consistencyLevel, searcher, searcher, searcher.getComparator(), rowKeys, pageSize);
         }
 
 
 
-        currentColumnIterator = new MultiKeyColumnNameIterator<>(columnNameIterators, searcher, pageSize);
-
 
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeColumnFamilies.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeColumnFamilies.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeColumnFamilies.java
index 9050b0a..ddd514b 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeColumnFamilies.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeColumnFamilies.java
@@ -37,6 +37,10 @@ import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeColumn
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeRowKey;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.RowKey;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.RowKeyType;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.serialize.EdgeRowKeySerializer;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.serialize.EdgeSerializer;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.serialize.RowSerializer;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.serialize.RowTypeSerializer;
 
 import com.netflix.astyanax.serializers.LongSerializer;
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/comparators/DescendingTimestampComparator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/comparators/DescendingTimestampComparator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/comparators/DescendingTimestampComparator.java
new file mode 100644
index 0000000..6e52dd5
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/comparators/DescendingTimestampComparator.java
@@ -0,0 +1,43 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.comparators;
+
+
+import java.util.Comparator;
+
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+
+
+/**
+ * Sorts longs from high to low.  High is "less than" the low values;
+ *
+ */
+public class DescendingTimestampComparator implements Comparator<MarkedEdge> {
+
+    public static final DescendingTimestampComparator INSTANCE = new DescendingTimestampComparator();
+
+
+    @Override
+    public int compare( final MarkedEdge o1, final MarkedEdge o2 ) {
+        return Long.compare( o1.getTimestamp(), o2.getTimestamp() )*-1;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/comparators/DirectedEdgeDescendingComparator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/comparators/DirectedEdgeDescendingComparator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/comparators/DirectedEdgeDescendingComparator.java
new file mode 100644
index 0000000..6ff65cb
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/comparators/DirectedEdgeDescendingComparator.java
@@ -0,0 +1,69 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.comparators;
+
+
+import java.util.Comparator;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.fasterxml.uuid.UUIDComparator;
+import com.fasterxml.uuid.impl.UUIDUtil;
+
+
+/**
+ * Comparator for comparing edges in descending order.  The first comparison is the timestamp,
+ * highest value should be first, so is considered "less".  If those are equal, the UUIId is compared.
+ * this will return the UUID to compare.  It will first be descending UUID, then ascending name
+ *
+ */
+public abstract class DirectedEdgeDescendingComparator implements Comparator<MarkedEdge> {
+
+    @Override
+    public int compare( final MarkedEdge first, final MarkedEdge second ) {
+
+        int compare = Long.compare( first.getTimestamp(), second.getTimestamp() ) * -1;
+
+        if(compare == 0){
+            final Id firstId = getId( first );
+            final Id secondId = getId( second );
+
+            compare = UUIDComparator.staticCompare( firstId.getUuid(), secondId.getUuid() ) * -1;
+
+            if(compare == 0){
+                compare = firstId.getType().compareTo( secondId.getType() );
+            }
+        }
+
+        return compare;
+    }
+
+
+    /**
+     * Return the Id to be used in the comparison
+     * @param edge
+     * @return
+     */
+    protected abstract Id getId(final MarkedEdge edge);
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/comparators/OrderedComparator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/comparators/OrderedComparator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/comparators/OrderedComparator.java
new file mode 100644
index 0000000..003ed36
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/comparators/OrderedComparator.java
@@ -0,0 +1,52 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.comparators;
+
+
+import java.util.Comparator;
+
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
+
+
+/**
+ * Comparator that will compare in reverse or forward order based on the order type specified.
+ *
+ * Assumes descending uses the default order.  If ASCENDING, the result of the comparator will be reversed
+ */
+public class OrderedComparator<T> implements Comparator<T> {
+
+
+    private final int invert;
+    private final Comparator<T> delegate;
+
+
+    public OrderedComparator( final Comparator<T> delegate, final SearchByEdgeType.Order order ) {
+        this.invert = order == SearchByEdgeType.Order.DESCENDING ? 1 : -1;
+        this.delegate = delegate;
+    }
+
+
+    @Override
+    public int compare( final T o1, final T o2 ) {
+        return delegate.compare( o1, o2 ) * invert;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/comparators/SourceDirectedEdgeDescendingComparator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/comparators/SourceDirectedEdgeDescendingComparator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/comparators/SourceDirectedEdgeDescendingComparator.java
new file mode 100644
index 0000000..f067006
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/comparators/SourceDirectedEdgeDescendingComparator.java
@@ -0,0 +1,42 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.comparators;
+
+
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+/**
+ *
+ * Comparator that uses source Id for comparisons.  Newer times will be "greater".  Newer uuids will be first.
+ *
+ */
+public class SourceDirectedEdgeDescendingComparator extends DirectedEdgeDescendingComparator {
+
+    public static final SourceDirectedEdgeDescendingComparator INSTANCE = new SourceDirectedEdgeDescendingComparator();
+
+    @Override
+    protected Id getId( final MarkedEdge edge ) {
+        return edge.getSourceNode();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/comparators/TargetDirectedEdgeDescendingComparator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/comparators/TargetDirectedEdgeDescendingComparator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/comparators/TargetDirectedEdgeDescendingComparator.java
new file mode 100644
index 0000000..115a874
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/comparators/TargetDirectedEdgeDescendingComparator.java
@@ -0,0 +1,42 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.comparators;
+
+
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+/**
+ *
+ * Comparator that uses source Id for comparisons.  Newer time uuids will be first.
+ *
+ */
+public class TargetDirectedEdgeDescendingComparator extends DirectedEdgeDescendingComparator {
+
+    public static final TargetDirectedEdgeDescendingComparator INSTANCE = new TargetDirectedEdgeDescendingComparator();
+
+    @Override
+    protected Id getId( final MarkedEdge edge ) {
+        return edge.getTargetNode();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/EdgeRowKeySerializer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/EdgeRowKeySerializer.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/EdgeRowKeySerializer.java
new file mode 100644
index 0000000..f1ae90b
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/EdgeRowKeySerializer.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.serialize;
+
+
+import org.apache.usergrid.persistence.core.astyanax.CompositeFieldSerializer;
+import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeRowKey;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.netflix.astyanax.model.CompositeBuilder;
+import com.netflix.astyanax.model.CompositeParser;
+
+
+/**
+ * Class to perform serialization for row keys from edges
+ */
+
+public class EdgeRowKeySerializer implements CompositeFieldSerializer<EdgeRowKey> {
+
+    private static final IdRowCompositeSerializer ID_SER = IdRowCompositeSerializer.get();
+
+
+    @Override
+    public void toComposite( final CompositeBuilder builder, final EdgeRowKey key ) {
+
+        //add the row id to the composite
+        ID_SER.toComposite( builder, key.sourceId );
+        builder.addString( key.edgeType );
+        ID_SER.toComposite( builder, key.targetId );
+        builder.addLong( key.shardId );
+    }
+
+
+    @Override
+    public EdgeRowKey fromComposite( final CompositeParser composite ) {
+
+        final Id sourceId = ID_SER.fromComposite( composite );
+        final String edgeType = composite.readString();
+        final Id targetId = ID_SER.fromComposite( composite );
+        final long shard = composite.readLong();
+
+        return new EdgeRowKey( sourceId, edgeType, targetId, shard );
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/EdgeSerializer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/EdgeSerializer.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/EdgeSerializer.java
new file mode 100644
index 0000000..590cf35
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/EdgeSerializer.java
@@ -0,0 +1,77 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.serialize;
+
+
+import java.nio.ByteBuffer;
+
+import org.apache.usergrid.persistence.core.astyanax.IdColDynamicCompositeSerializer;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdge;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Preconditions;
+import com.netflix.astyanax.model.DynamicComposite;
+import com.netflix.astyanax.serializers.AbstractSerializer;
+import com.netflix.astyanax.serializers.LongSerializer;
+
+
+/**
+ * Serializes to a source->target edge Note that we cannot set the edge type on de-serialization.  Only the target
+ * Id and version.
+ */
+public class EdgeSerializer extends AbstractSerializer<DirectedEdge> {
+
+    private static final IdColDynamicCompositeSerializer ID_COL_SERIALIZER = IdColDynamicCompositeSerializer.get();
+    private static final LongSerializer LONG_SERIALIZER = LongSerializer.get();
+
+
+    @Override
+    public ByteBuffer toByteBuffer( final DirectedEdge edge ) {
+
+        DynamicComposite composite = new DynamicComposite();
+
+        composite.addComponent( edge.timestamp, LONG_SERIALIZER );
+
+        ID_COL_SERIALIZER.toComposite( composite, edge.id );
+
+        return composite.serialize();
+    }
+
+
+    @Override
+    public DirectedEdge fromByteBuffer( final ByteBuffer byteBuffer ) {
+        DynamicComposite composite = DynamicComposite.fromByteBuffer( byteBuffer );
+
+        Preconditions.checkArgument( composite.size() == 3, "Composite should have 3 elements" );
+
+
+        //return the version
+        final long timestamp = composite.get( 0, LONG_SERIALIZER );
+
+
+        //parse our id
+        final Id id = ID_COL_SERIALIZER.fromComposite( composite, 1 );
+
+
+        return new DirectedEdge( id, timestamp );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/EdgeShardRowKeySerializer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/EdgeShardRowKeySerializer.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/EdgeShardRowKeySerializer.java
new file mode 100644
index 0000000..6b1c4e9
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/EdgeShardRowKeySerializer.java
@@ -0,0 +1,103 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.serialize;
+
+
+import org.apache.usergrid.persistence.core.astyanax.CompositeFieldSerializer;
+import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeType;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.netflix.astyanax.model.CompositeBuilder;
+import com.netflix.astyanax.model.CompositeParser;
+
+
+public class EdgeShardRowKeySerializer implements CompositeFieldSerializer<DirectedEdgeMeta> {
+
+    private static final IdRowCompositeSerializer ID_SER = IdRowCompositeSerializer.get();
+
+    public static final EdgeShardRowKeySerializer INSTANCE = new EdgeShardRowKeySerializer();
+
+
+    @Override
+    public void toComposite( final CompositeBuilder builder, final DirectedEdgeMeta meta ) {
+
+
+        final DirectedEdgeMeta.NodeMeta[] nodeMeta = meta.getNodes();
+
+        //add the stored value
+        builder.addInteger( meta.getType().getStorageValue() );
+
+        final int length = nodeMeta.length;
+
+        builder.addInteger( length );
+
+
+        for ( DirectedEdgeMeta.NodeMeta node : nodeMeta ) {
+            ID_SER.toComposite( builder, node.getId() );
+            builder.addInteger( node.getNodeType().getStorageValue() );
+        }
+
+        final String[] edgeTypes = meta.getTypes();
+
+        builder.addInteger( edgeTypes.length );
+
+        for ( String type : edgeTypes ) {
+            builder.addString( type );
+        }
+    }
+
+
+    @Override
+    public DirectedEdgeMeta fromComposite( final CompositeParser composite ) {
+
+
+        final int storageType = composite.readInteger();
+
+        final DirectedEdgeMeta.MetaType metaType = DirectedEdgeMeta.MetaType.fromStorage( storageType );
+
+        final int idLength = composite.readInteger();
+
+        final DirectedEdgeMeta.NodeMeta[] nodePairs = new DirectedEdgeMeta.NodeMeta[idLength];
+
+
+        for ( int i = 0; i < idLength; i++ ) {
+            final Id sourceId = ID_SER.fromComposite( composite );
+
+            final NodeType type = NodeType.get( composite.readInteger() );
+
+            nodePairs[i] = new DirectedEdgeMeta.NodeMeta( sourceId, type );
+        }
+
+
+        final int length = composite.readInteger();
+
+        String[] types = new String[length];
+
+        for ( int i = 0; i < length; i++ ) {
+            types[i] = composite.readString();
+        }
+
+        return  DirectedEdgeMeta.fromStorage( metaType, nodePairs, types );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/RowSerializer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/RowSerializer.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/RowSerializer.java
new file mode 100644
index 0000000..8376ef1
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/RowSerializer.java
@@ -0,0 +1,63 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.serialize;
+
+
+import org.apache.usergrid.persistence.core.astyanax.CompositeFieldSerializer;
+import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.RowKey;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.netflix.astyanax.model.CompositeBuilder;
+import com.netflix.astyanax.model.CompositeParser;
+
+
+/**
+ * Class to perform serialization for row keys from edges
+ */
+public class RowSerializer implements CompositeFieldSerializer<RowKey> {
+
+    private static final IdRowCompositeSerializer ID_SER = IdRowCompositeSerializer.get();
+
+
+    @Override
+    public void toComposite( final CompositeBuilder builder, final RowKey key ) {
+
+        //add the row id to the composite
+        ID_SER.toComposite( builder, key.nodeId );
+
+        builder.addString( key.edgeType );
+        builder.addLong( key.shardId );
+    }
+
+
+    @Override
+    public RowKey fromComposite( final CompositeParser composite ) {
+
+        final Id id = ID_SER.fromComposite( composite );
+        final String edgeType = composite.readString();
+        final long shard = composite.readLong();
+
+
+        return new RowKey( id, edgeType, shard );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/RowTypeSerializer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/RowTypeSerializer.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/RowTypeSerializer.java
new file mode 100644
index 0000000..a67c469
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/RowTypeSerializer.java
@@ -0,0 +1,62 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.serialize;
+
+
+import org.apache.usergrid.persistence.core.astyanax.CompositeFieldSerializer;
+import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.RowKeyType;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.netflix.astyanax.model.CompositeBuilder;
+import com.netflix.astyanax.model.CompositeParser;
+
+
+public class RowTypeSerializer implements CompositeFieldSerializer<RowKeyType> {
+
+    private static final IdRowCompositeSerializer ID_SER = IdRowCompositeSerializer.get();
+
+
+    @Override
+    public void toComposite( final CompositeBuilder builder, final RowKeyType keyType ) {
+
+        //add the row id to the composite
+        ID_SER.toComposite( builder, keyType.nodeId );
+
+        builder.addString( keyType.edgeType );
+        builder.addString( keyType.idType );
+
+        builder.addLong( keyType.shardId );
+    }
+
+
+    @Override
+    public RowKeyType fromComposite( final CompositeParser composite ) {
+
+        final Id id = ID_SER.fromComposite( composite );
+        final String edgeType = composite.readString();
+        final String idType = composite.readString();
+        final long shard = composite.readLong();
+
+        return new RowKeyType( id, edgeType, idType, shard);
+    }
+}