You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2014/08/29 19:34:02 UTC

[04/20] Implemented New rolling shard algorithm. This should allow eventual shard consistency between all clients without the need for an external locking allocation system

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72684dfc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/RowKey.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/RowKey.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/RowKey.java
new file mode 100644
index 0000000..9bd9937
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/RowKey.java
@@ -0,0 +1,45 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard;
+
+
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+/**
+ * Class that represents an edge row key
+ */
+public class RowKey {
+    public final Id nodeId;
+    public final String edgeType;
+    public final long shardId;
+
+
+    /**
+     * Create a row key with the node and the edgeType
+     */
+    public RowKey( Id nodeId, String edgeType, final long shardId ) {
+        this.nodeId = nodeId;
+        this.edgeType = edgeType;
+        this.shardId = shardId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72684dfc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/RowKeyType.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/RowKeyType.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/RowKeyType.java
new file mode 100644
index 0000000..6e69bbf
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/RowKeyType.java
@@ -0,0 +1,61 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard;
+
+
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+/**
+ * The row key with the additional type
+ */
+public class RowKeyType extends RowKey {
+
+    public final String idType;
+
+    /**
+     * Create a row key with the node id in the row key, the edge type, and the type from the typeid
+     *
+     * @param nodeId The node id in the row key
+     * @param edgeType The type of the edge
+     * @param typeId The type of hte id
+     */
+    public RowKeyType( final Id nodeId, final String edgeType, final Id typeId, final long shardId ) {
+        this( nodeId, edgeType, typeId.getType(), shardId );
+    }
+
+
+    /**
+     * Create a row key with the node id in the row key, the edge type, and the type from the typeid
+     *
+     * @param nodeId The node id in the row key
+     * @param edgeType The type of the edge
+     * @param typeId The type of hte id
+     */
+    public RowKeyType( final Id nodeId, final String edgeType, final String typeId, final long shardId ) {
+        super( nodeId, edgeType, shardId );
+        this.idType = typeId;
+    }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72684dfc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java
new file mode 100644
index 0000000..38fe51c
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.graph.serialization.impl.shard;
+
+
+public class Shard implements Comparable<Shard> {
+
+    private final long shardIndex;
+    private final long createdTime;
+    private final boolean compacted;
+
+
+    public Shard( final long shardIndex, final long createdTime, final boolean compacted ) {
+        this.shardIndex = shardIndex;
+        this.createdTime = createdTime;
+        this.compacted = compacted;
+    }
+
+
+    /**
+     * Get the long shard index
+     */
+    public long getShardIndex() {
+        return shardIndex;
+    }
+
+
+    /**
+     * Get the timestamp in epoch millis this shard was created
+     */
+    public long getCreatedTime() {
+        return createdTime;
+    }
+
+
+    /**
+     * Return true if this shard has been compacted
+     */
+    public boolean isCompacted() {
+        return compacted;
+    }
+
+
+    /**
+     * Compare the shards based on the timestamp first, then the created time second
+     */
+    @Override
+    public int compareTo( final Shard o ) {
+        if ( o == null ) {
+            return 1;
+        }
+
+        if ( shardIndex > o.shardIndex ) {
+            return 1;
+        }
+
+        else if ( shardIndex == o.shardIndex ) {
+            if ( createdTime > o.createdTime ) {
+                return 1;
+            }
+            else if ( createdTime < o.createdTime ) {
+                return -1;
+            }
+
+            else {
+
+                //kind of arbitrary compacted takes precedence
+                if ( compacted && !o.compacted ) {
+                    return 1;
+                }
+
+                else if ( !compacted && o.compacted ){
+                    return -1;
+                }
+
+
+            }
+            return 0;
+        }
+
+        return -1;
+    }
+
+
+    @Override
+    public boolean equals( final Object o ) {
+        if ( this == o ) {
+            return true;
+        }
+        if ( o == null || getClass() != o.getClass() ) {
+            return false;
+        }
+
+        final Shard shard = ( Shard ) o;
+
+        if ( compacted != shard.compacted ) {
+            return false;
+        }
+        if ( createdTime != shard.createdTime ) {
+            return false;
+        }
+        if ( shardIndex != shard.shardIndex ) {
+            return false;
+        }
+
+        return true;
+    }
+
+
+    @Override
+    public int hashCode() {
+        int result = ( int ) ( shardIndex ^ ( shardIndex >>> 32 ) );
+        result = 31 * result + ( int ) ( createdTime ^ ( createdTime >>> 32 ) );
+        result = 31 * result + ( compacted ? 1 : 0 );
+        return result;
+    }
+
+
+    @Override
+    public String toString() {
+        return "Shard{" +
+                "shardIndex=" + shardIndex +
+                ", createdTime=" + createdTime +
+                "} ";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72684dfc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
new file mode 100644
index 0000000..59ea9fb
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.graph.serialization.impl.shard;
+
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import com.google.common.base.Preconditions;
+
+
+/**
+ * There are cases where we need to read or write to more than 1 shard.  This object encapsulates a set of shards that
+ * should be written to and read from.  All reads should combine the data sets from all shards in the group, and writes
+ * should be written to each shard.  Once the shard can safely be compacted a background process should be triggered to
+ * remove additional shards and make seeks faster.  This multiread/write should only occur during the time period of the
+ * delta (in milliseconds), after which the next read will asynchronously compact the shards into a single shard.
+ */
+public class ShardEntryGroup {
+
+
+    private List<Shard> shards;
+
+    private final long delta;
+
+    private long maxCreatedTime;
+
+    private Shard compactionTarget;
+
+
+    /**
+     * The max delta we accept in milliseconds for create time to be considered a member of this group
+     */
+    public ShardEntryGroup( final long delta ) {
+        Preconditions.checkArgument( delta > 0, "delta must be greater than 0" );
+        this.delta = delta;
+        this.shards = new ArrayList<>();
+        this.maxCreatedTime = 0;
+    }
+
+
+    /**
+     * Only add a shard if it is within the rules require to meet a group.  The rules are outlined below.
+     *
+     * Case 1)  First shard in the group, always added
+     *
+     * Case 2) Shard is unmerged, it should be included with it's peers since other nodes may not have it yet
+     *
+     * Case 3) The list contains only non compacted shards, and this is last and and merged.  It is considered a lower
+     * bound
+     */
+    public boolean addShard( final Shard shard ) {
+
+        Preconditions.checkNotNull( "shard cannot be null", shard );
+
+        final int size = shards.size();
+
+        if ( size == 0 ) {
+            addShardInternal( shard );
+            return true;
+        }
+
+        final Shard minShard = shards.get( size - 1 );
+
+        Preconditions.checkArgument( minShard.compareTo( shard ) > 0, "shard must be less than the current max" );
+
+        //shard is not compacted, or it's predecessor isn't, we should include it in this group
+        if ( !minShard.isCompacted() ) {
+            addShardInternal( shard );
+            return true;
+        }
+
+
+        return false;
+    }
+
+
+    /**
+     * Add the shard and set the min created time
+     */
+    private void addShardInternal( final Shard shard ) {
+        shards.add( shard );
+
+        maxCreatedTime = Math.max( maxCreatedTime, shard.getCreatedTime() );
+
+        //we're changing our structure, unset the compaction target
+        compactionTarget = null;
+    }
+
+
+    /**
+     * Return the minum shard based on time indexes
+     */
+    public Shard getMinShard() {
+        final int size = shards.size();
+
+        if ( size < 1 ) {
+            return null;
+        }
+
+        return shards.get( size - 1 );
+    }
+
+
+    /**
+     * Get the entries that we should read from.
+     */
+    public Collection<Shard> getReadShards() {
+        return shards;
+    }
+
+
+    /**
+     * Get the entries, with the max shard time being first. We write to all shards until they're migrated
+     */
+    public Collection<Shard> getWriteShards( long currentTime ) {
+
+        /**
+         * The shards in this set can be combined, we should only write to the compaction target to avoid
+         * adding data to other shards
+         */
+        if ( !isTooSmallToCompact() && shouldCompact( currentTime ) ) {
+            return Collections.singleton( getCompactionTarget() );
+        }
+
+
+        return shards;
+    }
+
+
+    /**
+     * Return true if we have a pending compaction
+     */
+    public boolean isCompactionPending() {
+        return !isTooSmallToCompact();
+    }
+
+
+    /**
+     * Get the shard all compactions should write to.  Null indicates we cannot find a shard that could be used as a
+     * compaction target.  Note that this shard may not have surpassed the delta yet You should invoke "shouldCompact"
+     * first to ensure all criteria are met before initiating compaction
+     */
+    public Shard getCompactionTarget() {
+
+        if ( compactionTarget != null ) {
+            return compactionTarget;
+        }
+
+
+        //we have < 2 shards, we can't compact
+        if ( isTooSmallToCompact() ) {
+            return null;
+        }
+
+
+        final int lastIndex = shards.size() - 1;
+
+        final Shard last = shards.get( lastIndex );
+
+        //Our oldest isn't compacted. As a result we have no "bookend" to delimit this entry group.  Therefore we
+        // can't compact
+        if ( !last.isCompacted() ) {
+            return null;
+        }
+
+        //Start seeking from the end of our group.  The first shard we encounter that is not compacted is our
+        // compaction target
+        //NOTE: This does not mean we can compact, rather it's just an indication that we have a target set.
+        for ( int i = lastIndex - 1; i > -1; i-- ) {
+            final Shard compactionCandidate = shards.get( i );
+
+
+            if ( !compactionCandidate.isCompacted() ) {
+                compactionTarget = compactionCandidate;
+                break;
+            }
+        }
+
+        return compactionTarget;
+    }
+
+
+    /**
+     * Return the number of entries in this shard group
+     */
+    public int entrySize() {
+        return shards.size();
+    }
+
+
+    /**
+     * Return true if there are not enough elements in this entry group to consider compaction
+     */
+    private boolean isTooSmallToCompact() {
+        return shards.size() < 2;
+    }
+
+
+    /**
+     * Returns true if the newest created shard is path the currentTime - delta
+     *
+     * @param currentTime The current system time in milliseconds
+     *
+     * @return True if these shards can safely be combined into a single shard, false otherwise
+     */
+    public boolean shouldCompact( final long currentTime ) {
+
+        /**
+         * We don't have enough shards to compact, ignore
+         */
+        return getCompactionTarget() != null
+
+
+                /**
+                 * If something was created within the delta time frame, not everyone may have seen it due to
+                 * cache refresh, we can't compact yet.
+                 */
+
+                && currentTime - delta > maxCreatedTime;
+    }
+
+
+    /**
+     * Return true if this shard can be deleted AFTER all of the data in it has been moved
+     */
+    public boolean canBeDeleted( final Shard shard ) {
+        //if we're a neighbor shard (n-1) or the target compaction shard, we can't be deleted
+        //we purposefully use shard index comparison over .equals here, since 2 shards might have the same index with
+        // different timestamps
+        // (unlikely but could happen)
+
+        final Shard compactionTarget = getCompactionTarget();
+
+
+        return !shard.isCompacted() && ( compactionTarget != null && compactionTarget.getShardIndex() != shard
+                .getShardIndex() );
+    }
+
+
+    /**
+     * Helper method to create a shard entry group with a single shard
+     */
+    public static ShardEntryGroup singletonGroup( final Shard shard, final long delta ) {
+        ShardEntryGroup group = new ShardEntryGroup( delta );
+        group.addShard( shard );
+
+        return group;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72684dfc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
new file mode 100644
index 0000000..13c5596
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
@@ -0,0 +1,43 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard;
+
+
+import rx.Observable;
+
+
+/**
+ * Defines tasks for running compaction
+ *
+ *
+ */
+public interface ShardGroupCompaction {
+
+
+    /**
+     * Execute the compaction task.  Will return the number of edges that have
+     * @param group The shard entry group to compact
+     * @return The number of edges that are now compacted into the target shard
+     */
+    public Observable<Integer> compact(ShardEntryGroup group);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72684dfc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardedEdgeSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardedEdgeSerialization.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardedEdgeSerialization.java
new file mode 100644
index 0000000..d1ad18d
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardedEdgeSerialization.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.graph.serialization.impl.shard;
+
+
+import java.util.Iterator;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.SearchByEdge;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
+import org.apache.usergrid.persistence.graph.SearchByIdType;
+
+import com.netflix.astyanax.MutationBatch;
+
+
+/**
+ * Performs serialization on the shards
+ */
+public interface ShardedEdgeSerialization {
+
+    /**
+     * EdgeWrite both the source--->Target edge and the target <----- source edge into the mutation
+     *
+     * @param columnFamilies The column families to use
+     * @param scope The org scope of the graph
+     * @param markedEdge The edge to write
+     * @param timestamp The timestamp to use
+     */
+    MutationBatch writeEdge( EdgeColumnFamilies columnFamilies, ApplicationScope scope, MarkedEdge markedEdge,
+                             UUID timestamp );
+
+    /**
+     * EdgeWrite both the source -->target edge and the target<--- source edge into the mutation
+     *
+     * @param columnFamilies The column families to use
+     * @param scope The org scope of the graph
+     * @param markedEdge The edge to write
+     * @param timestamp The timestamp of the uuid
+     */
+    MutationBatch deleteEdge( EdgeColumnFamilies columnFamilies, ApplicationScope scope, MarkedEdge markedEdge,
+                              UUID timestamp );
+
+
+    /**
+     * Search for all versions of this edge < the search version.  Returns all versions
+     *
+     * @param columnFamilies The column families to use
+     * @param scope The application scope
+     * @param search The search criteria
+     * @param shards The shards to iterate when searching
+     */
+    Iterator<MarkedEdge> getEdgeVersions( EdgeColumnFamilies columnFamilies, ApplicationScope scope,
+                                          SearchByEdge search, Iterator<ShardEntryGroup> shards );
+
+    /**
+     * Get an iterator of all edges by edge type originating from source node
+     *
+     * @param columnFamilies The column families to use
+     * @param scope The application scope
+     * @param search The search criteria
+     * @param shards The shards to iterate when searching
+     */
+    Iterator<MarkedEdge> getEdgesFromSource( EdgeColumnFamilies columnFamilies, ApplicationScope scope,
+                                             SearchByEdgeType search, Iterator<ShardEntryGroup> shards );
+
+
+    /**
+     * Get an iterator of all edges by edge type originating from source node.  Also filters by target node id type
+     *
+     * @param columnFamilies The column families to use
+     * @param scope The application scope
+     * @param search The search criteria
+     * @param shards The shards to iterate when searching
+     */
+    Iterator<MarkedEdge> getEdgesFromSourceByTargetType( EdgeColumnFamilies columnFamilies, ApplicationScope scope,
+                                                         SearchByIdType search, Iterator<ShardEntryGroup> shards );
+
+    /**
+     * Get an iterator of all edges by edge type pointing to the target node.  Returns all versions
+     *
+     * @param columnFamilies The column families to use
+     * @param scope The application scope
+     * @param search The search criteria
+     * @param shards The shards to iterate when searching
+     */
+    Iterator<MarkedEdge> getEdgesToTarget( EdgeColumnFamilies columnFamilies, ApplicationScope scope,
+                                           SearchByEdgeType search, Iterator<ShardEntryGroup> shards );
+
+
+    /**
+     * Get an iterator of all edges by edge type pointing to the target node.  Also uses the source id type to limit the
+     * results
+     *
+     * @param columnFamilies The column families to use
+     * @param scope The application scope
+     * @param search The search criteria
+     * @param shards The shards to iterate when searching
+     */
+    Iterator<MarkedEdge> getEdgesToTargetBySourceType( EdgeColumnFamilies columnFamilies, ApplicationScope scope,
+                                                       SearchByIdType search, Iterator<ShardEntryGroup> shards );
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72684dfc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/Counter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/Counter.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/Counter.java
index 4318200..f5666a2 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/Counter.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/Counter.java
@@ -24,6 +24,8 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 
+import com.google.common.base.Preconditions;
+
 
 /**
  * This class is synchronized for addition.  It is meant to be used across multiple threads
@@ -35,7 +37,7 @@ public class Counter {
     private final AtomicLong invokeCounter;
 
     /**
-     * Pointer to our "current" counter map.  We flush this when time expires or we hit our count
+     * Pointer to our "current" counter map.  We beginFlush this when time expires or we hit our count
      */
     private final ConcurrentHashMap<ShardKey, AtomicLong> counts;
 
@@ -94,6 +96,10 @@ public class Counter {
      * @param other
      */
     public void merge(final Counter other){
+
+        Preconditions.checkNotNull(other, "other cannot be null");
+        Preconditions.checkNotNull( other.counts, "other.counts cannot be null" );
+
         for(Map.Entry<ShardKey, AtomicLong> entry: other.counts.entrySet()){
             add(entry.getKey(), entry.getValue().get());
         }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72684dfc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java
index 7dc763f..47243e5 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java
@@ -20,16 +20,22 @@
 package org.apache.usergrid.persistence.graph.serialization.impl.shard.count;
 
 
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import javax.inject.Inject;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.usergrid.persistence.core.consistency.TimeService;
 import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.GraphFig;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardApproximation;
-import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
 
 import com.netflix.astyanax.MutationBatch;
 import com.netflix.hystrix.HystrixCommand;
@@ -40,10 +46,12 @@ import rx.schedulers.Schedulers;
 
 /**
  * Implementation for doing edge approximation based on counters.  Uses a guava loading cache to load values from
- * cassandra, and flush them on cache eviction.
+ * cassandra, and beginFlush them on cache eviction.
  */
 public class NodeShardApproximationImpl implements NodeShardApproximation {
 
+    private static final Logger LOG = LoggerFactory.getLogger(NodeShardApproximationImpl.class);
+
     /**
      * Read write locks to ensure we atomically swap correctly
      */
@@ -63,7 +71,9 @@ public class NodeShardApproximationImpl implements NodeShardApproximation {
     /**
      * The counter that is currently in process of flushing to Cassandra.  Can be null
      */
-    private volatile Counter flushPending;
+    private final BlockingQueue<Counter> flushQueue;
+
+    private final FlushWorker worker;
 
 
     /**
@@ -77,15 +87,22 @@ public class NodeShardApproximationImpl implements NodeShardApproximation {
         this.nodeShardCounterSerialization = nodeShardCounterSerialization;
         this.timeService = timeService;
         this.currentCounter = new Counter();
+        this.flushQueue = new LinkedBlockingQueue<>( graphFig.getCounterFlushQueueSize() );
+
+        this.worker = new FlushWorker( this.flushQueue, nodeShardCounterSerialization );
+
+        Schedulers.newThread().createWorker().schedule( worker );
+
     }
 
 
     @Override
-    public void increment( final ApplicationScope scope, final Id nodeId, final long shardId, final long count,
-                           final String... edgeType ) {
+    public void increment(
+            final ApplicationScope scope, final Shard shard,
+            final long count, final DirectedEdgeMeta directedEdgeMeta  ) {
 
 
-        final ShardKey key = new ShardKey( scope, nodeId, shardId, edgeType );
+        final ShardKey key = new ShardKey( scope, shard, directedEdgeMeta );
 
         readLock.lock();
 
@@ -102,10 +119,9 @@ public class NodeShardApproximationImpl implements NodeShardApproximation {
 
 
     @Override
-    public long getCount( final ApplicationScope scope, final Id nodeId, final long shardId,
-                          final String... edgeType ) {
+    public long getCount( final ApplicationScope scope, final Shard shard, final DirectedEdgeMeta directedEdgeMeta ) {
 
-        final ShardKey key = new ShardKey( scope, nodeId, shardId, edgeType );
+        final ShardKey key = new ShardKey( scope, shard, directedEdgeMeta );
 
 
         readLock.lock();
@@ -115,9 +131,6 @@ public class NodeShardApproximationImpl implements NodeShardApproximation {
         try {
             count = currentCounter.get( key );
 
-            if ( flushPending != null ) {
-                count += flushPending.get( key );
-            }
         }
         finally {
             readLock.unlock();
@@ -130,78 +143,121 @@ public class NodeShardApproximationImpl implements NodeShardApproximation {
 
 
     @Override
-    public void flush() {
+    public void beginFlush() {
 
         writeLockLock.lock();
 
         try {
-            flushPending = currentCounter;
-            currentCounter = new Counter();
+
+            final boolean queued = flushQueue.offer( currentCounter );
+
+            /**
+             * We were able to q the beginFlush, swap it
+             */
+            if ( queued ) {
+                currentCounter = new Counter();
+            }
         }
         finally {
             writeLockLock.unlock();
         }
+    }
 
 
-        //copy to the batch outside of the command for performance
-        final MutationBatch batch = nodeShardCounterSerialization.flush( flushPending );
+    @Override
+    public boolean flushPending() {
+        return flushQueue.size() > 0 || worker.isFlushing();
+    }
 
-        /**
-         * Execute the command in hystrix to avoid slamming cassandra
-         */
-        new HystrixCommand( HystrixCassandra.ASYNC_GROUP ) {
 
-            @Override
-            protected Void run() throws Exception {
-                /**
-                 * Execute the batch asynchronously
-                 */
-                batch.execute();
+    /**
+     * Check if we need to beginFlush.  If we do, perform the beginFlush
+     */
+    private void checkFlush() {
 
-                return null;
-            }
+        //there's no beginFlush pending and we're past the timeout or count
+        if ( currentCounter.getCreateTimestamp() + graphFig.getCounterFlushInterval() > timeService.getCurrentTime()
+                || currentCounter.getInvokeCount() >= graphFig.getCounterFlushCount() ) {
+            beginFlush();
+        }
+    }
 
 
-            @Override
-            protected Object getFallback() {
-                //we've failed to mutate.  Merge this count back into the current one
-                currentCounter.merge( flushPending );
+    /**
+     * Worker that will take from the queue
+     */
+    private static class FlushWorker implements Action0 {
 
-                return null;
-            }
-        }.execute();
+        private final BlockingQueue<Counter> counterQueue;
+        private final NodeShardCounterSerialization nodeShardCounterSerialization;
 
-        writeLockLock.lock();
+        private volatile Counter rollUp;
 
-        try {
-            flushPending = null;
-        }
-        finally {
-            writeLockLock.unlock();
+
+        private FlushWorker( final BlockingQueue<Counter> counterQueue,
+                             final NodeShardCounterSerialization nodeShardCounterSerialization ) {
+            this.counterQueue = counterQueue;
+            this.nodeShardCounterSerialization = nodeShardCounterSerialization;
         }
-    }
 
 
-    /**
-     * Check if we need to flush.  If we do, perform the flush
-     */
-    private void checkFlush() {
+        @Override
+        public void call() {
 
-        //there's no flush pending and we're past the timeout or count
-        if ( flushPending == null && (
-                currentCounter.getCreateTimestamp() + graphFig.getCounterFlushInterval() > timeService.getCurrentTime()
-                        || currentCounter.getInvokeCount() >= graphFig.getCounterFlushCount() ) ) {
 
+            while ( true ) {
+                /**
+                 * Block taking the first element.  Once we take this, batch drain and roll up the rest
+                 */
+
+                try {
+                    rollUp = null;
+                    rollUp = counterQueue.take();
+                }
+                catch ( InterruptedException e ) {
+                    LOG.error( "Unable to read from counter queue", e );
+                    throw new RuntimeException( "Unable to read from counter queue", e );
 
-            /**
-             * Fire the flush action asynchronously
-             */
-            Schedulers.immediate().createWorker().schedule( new Action0() {
-                @Override
-                public void call() {
-                    flush();
                 }
-            } );
+
+
+
+
+                //copy to the batch outside of the command for performance
+                final MutationBatch batch = nodeShardCounterSerialization.flush( rollUp );
+
+                /**
+                 * Execute the command in hystrix to avoid slamming cassandra
+                 */
+                new HystrixCommand( HystrixCassandra.ASYNC_GROUP ) {
+
+                    @Override
+                    protected Void run() throws Exception {
+                        batch.execute();
+
+                        return null;
+                    }
+
+
+                    @Override
+                    protected Object getFallback() {
+                        //we've failed to mutate.  Merge this count back into the current one
+                        counterQueue.offer( rollUp );
+
+                        return null;
+                    }
+                }.execute();
+            }
+
+        }
+
+
+        /**
+         * Return true if we're in the process of flushing
+         * @return
+         */
+        public boolean isFlushing(){
+            return rollUp != null;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72684dfc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerialization.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerialization.java
index 41eb525..4b05401 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerialization.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerialization.java
@@ -27,7 +27,7 @@ import com.netflix.astyanax.MutationBatch;
 /**
  * Serialization for flushing and reading counters
  */
-public interface NodeShardCounterSerialization  extends Migration {
+public interface NodeShardCounterSerialization extends Migration {
 
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72684dfc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java
index da318bf..6b99e93 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java
@@ -29,15 +29,20 @@ import org.apache.cassandra.db.marshal.CounterColumnType;
 
 import org.apache.usergrid.persistence.core.astyanax.CassandraConfig;
 import org.apache.usergrid.persistence.core.astyanax.ColumnTypes;
+import org.apache.usergrid.persistence.core.astyanax.CompositeFieldSerializer;
+import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer;
 import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
 import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
 import org.apache.usergrid.persistence.core.astyanax.OrganizationScopedRowKeySerializer;
 import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
 import org.apache.usergrid.persistence.graph.GraphFig;
 import org.apache.usergrid.persistence.graph.exception.GraphRuntimeException;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.EdgeRowKey;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.EdgeRowKeySerializer;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.EdgeShardRowKeySerializer;
+import org.apache.usergrid.persistence.model.entity.Id;
 
 import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
@@ -48,20 +53,23 @@ import com.netflix.astyanax.connectionpool.OperationResult;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 import com.netflix.astyanax.connectionpool.exceptions.NotFoundException;
 import com.netflix.astyanax.model.Column;
-import com.netflix.astyanax.serializers.LongSerializer;
-
+import com.netflix.astyanax.model.CompositeBuilder;
+import com.netflix.astyanax.model.CompositeParser;
+import com.netflix.astyanax.serializers.BooleanSerializer;
 
 
 @Singleton
 public class NodeShardCounterSerializationImpl implements NodeShardCounterSerialization {
 
 
+    private static final ShardKeySerializer SHARD_KEY_SERIALIZER = new ShardKeySerializer();
+
     /**
      * Edge shards
      */
-    private static final MultiTennantColumnFamily<ApplicationScope, EdgeRowKey, Long> EDGE_SHARD_COUNTS =
+    private static final MultiTennantColumnFamily<ApplicationScope, ShardKey, Boolean> EDGE_SHARD_COUNTS =
             new MultiTennantColumnFamily<>( "Edge_Shard_Counts",
-                    new OrganizationScopedRowKeySerializer<>( new EdgeRowKeySerializer() ), LongSerializer.get() );
+                    new OrganizationScopedRowKeySerializer<>( SHARD_KEY_SERIALIZER ), BooleanSerializer.get() );
 
 
     protected final Keyspace keyspace;
@@ -92,12 +100,11 @@ public class NodeShardCounterSerializationImpl implements NodeShardCounterSerial
             final ShardKey key = entry.getKey();
             final long value = entry.getValue().get();
 
-            final EdgeRowKey edgeRowKey = new EdgeRowKey( key.getNodeId(), key.getEdgeTypes() );
 
-            final ScopedRowKey rowKey = ScopedRowKey.fromKey( key.getScope(), edgeRowKey );
+            final ScopedRowKey rowKey = ScopedRowKey.fromKey( key.scope, key );
 
 
-            batch.withRow( EDGE_SHARD_COUNTS, rowKey ).incrementCounterColumn( key.getShardId(), value );
+            batch.withRow( EDGE_SHARD_COUNTS, rowKey ).incrementCounterColumn(true , value );
         }
 
 
@@ -108,14 +115,12 @@ public class NodeShardCounterSerializationImpl implements NodeShardCounterSerial
     @Override
     public long getCount( final ShardKey key ) {
 
-        final EdgeRowKey edgeRowKey = new EdgeRowKey( key.getNodeId(), key.getEdgeTypes() );
-
-        final ScopedRowKey rowKey = ScopedRowKey.fromKey( key.getScope(), edgeRowKey );
+        final ScopedRowKey rowKey = ScopedRowKey.fromKey( key.scope, key );
 
 
         try {
-            OperationResult<Column<Long>> column =
-                    keyspace.prepareQuery( EDGE_SHARD_COUNTS ).getKey( rowKey ).getColumn( key.getShardId() ).execute();
+            OperationResult<Column<Boolean>> column =
+                    keyspace.prepareQuery( EDGE_SHARD_COUNTS ).getKey( rowKey ).getColumn( true ).execute();
 
             return column.getResult().getLongValue();
         }
@@ -133,7 +138,50 @@ public class NodeShardCounterSerializationImpl implements NodeShardCounterSerial
     public Collection<MultiTennantColumnFamilyDefinition> getColumnFamilies() {
         return Collections.singleton(
                 new MultiTennantColumnFamilyDefinition( EDGE_SHARD_COUNTS, BytesType.class.getSimpleName(),
-                        ColumnTypes.LONG_TYPE_REVERSED, CounterColumnType.class.getSimpleName(),
-                        MultiTennantColumnFamilyDefinition.CacheOption.KEYS ) );
+                        ColumnTypes.BOOLEAN, CounterColumnType.class.getSimpleName(),
+                        MultiTennantColumnFamilyDefinition.CacheOption.ALL ) );
     }
+
+
+
+    private static class ShardKeySerializer implements CompositeFieldSerializer<ShardKey> {
+
+
+        private static final IdRowCompositeSerializer ID_SER = IdRowCompositeSerializer.get();
+
+        private static final EdgeShardRowKeySerializer EDGE_SHARD_ROW_KEY_SERIALIZER = EdgeShardRowKeySerializer.INSTANCE;
+
+
+        @Override
+        public void toComposite( final CompositeBuilder builder, final ShardKey key ) {
+
+            ID_SER.toComposite( builder, key.scope.getApplication() );
+
+            EDGE_SHARD_ROW_KEY_SERIALIZER.toComposite( builder, key.directedEdgeMeta );
+
+            builder.addLong( key.shard.getShardIndex() );
+
+            builder.addLong( key.shard.getCreatedTime() );
+        }
+
+
+        @Override
+        public ShardKey fromComposite( final CompositeParser composite ) {
+
+            final Id applicationId = ID_SER.fromComposite( composite );
+
+            final ApplicationScope scope = new ApplicationScopeImpl( applicationId );
+
+            final DirectedEdgeMeta directedEdgeMeta = EDGE_SHARD_ROW_KEY_SERIALIZER.fromComposite( composite );
+
+            final long shardIndex = composite.readLong();
+
+            final long shardCreatedTime = composite.readLong();
+
+            return new ShardKey( scope, new Shard( shardIndex, shardCreatedTime, false ), directedEdgeMeta );
+        }
+
+
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72684dfc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/ShardKey.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/ShardKey.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/ShardKey.java
index 63c87d3..c976210 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/ShardKey.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/ShardKey.java
@@ -19,27 +19,24 @@
 package org.apache.usergrid.persistence.graph.serialization.impl.shard.count;
 
 
-import java.util.Arrays;
-
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
 
 
 /**
  * Key for shards and counts
  */
 public class ShardKey {
-    private final ApplicationScope scope;
-    private final Id nodeId;
-    private final long shardId;
-    private final String[] edgeTypes;
+    public final ApplicationScope scope;
+    public final Shard shard;
+    public final DirectedEdgeMeta directedEdgeMeta;
 
 
-    public ShardKey( final ApplicationScope scope, final Id nodeId, final long shardId, final String... edgeTypes ) {
+    public ShardKey( final ApplicationScope scope, final Shard shard, final DirectedEdgeMeta directedEdgeMeta ) {
         this.scope = scope;
-        this.nodeId = nodeId;
-        this.shardId = shardId;
-        this.edgeTypes = edgeTypes;
+        this.shard = shard;
+        this.directedEdgeMeta = directedEdgeMeta;
     }
 
 
@@ -54,16 +51,13 @@ public class ShardKey {
 
         final ShardKey shardKey = ( ShardKey ) o;
 
-        if ( shardId != shardKey.shardId ) {
-            return false;
-        }
-        if ( !Arrays.equals( edgeTypes, shardKey.edgeTypes ) ) {
+        if ( !directedEdgeMeta.equals( shardKey.directedEdgeMeta ) ) {
             return false;
         }
-        if ( !nodeId.equals( shardKey.nodeId ) ) {
+        if ( !scope.equals( shardKey.scope ) ) {
             return false;
         }
-        if ( !scope.equals( shardKey.scope ) ) {
+        if ( !shard.equals( shardKey.shard ) ) {
             return false;
         }
 
@@ -71,32 +65,11 @@ public class ShardKey {
     }
 
 
-    public ApplicationScope getScope() {
-        return scope;
-    }
-
-
-    public Id getNodeId() {
-        return nodeId;
-    }
-
-
-    public long getShardId() {
-        return shardId;
-    }
-
-
-    public String[] getEdgeTypes() {
-        return edgeTypes;
-    }
-
-
     @Override
     public int hashCode() {
         int result = scope.hashCode();
-        result = 31 * result + nodeId.hashCode();
-        result = 31 * result + ( int ) ( shardId ^ ( shardId >>> 32 ) );
-        result = 31 * result + Arrays.hashCode( edgeTypes );
+        result = 31 * result + shard.hashCode();
+        result = 31 * result + directedEdgeMeta.hashCode();
         return result;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72684dfc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeRowKey.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeRowKey.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeRowKey.java
deleted file mode 100644
index f4d8467..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeRowKey.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
-
-
-import org.apache.usergrid.persistence.graph.serialization.util.EdgeHasher;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-
-/**
- * Used to store row keys by sourceId, targetId and edgeType
- */
-public class EdgeRowKey {
-    public final Id targetId;
-    public final long[] edgeTypesHash;
-
-
-    public EdgeRowKey( final Id rowId, final String[] edgeTypes ) {
-        this( rowId, EdgeHasher.createEdgeHash( edgeTypes ) );
-    }
-
-
-    public EdgeRowKey( final Id rowId, final long[] hash ) {
-        this.targetId = rowId;
-        this.edgeTypesHash = hash;
-    }
-}
-
-
-

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72684dfc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeRowKeySerializer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeRowKeySerializer.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeRowKeySerializer.java
index cc615c1..90b264c 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeRowKeySerializer.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeRowKeySerializer.java
@@ -21,6 +21,7 @@ package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
 
 import org.apache.usergrid.persistence.core.astyanax.CompositeFieldSerializer;
 import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeRowKey;
 import org.apache.usergrid.persistence.model.entity.Id;
 
 import com.netflix.astyanax.model.CompositeBuilder;
@@ -30,6 +31,7 @@ import com.netflix.astyanax.model.CompositeParser;
 /**
  * Class to perform serialization for row keys from edges
  */
+
 public class EdgeRowKeySerializer implements CompositeFieldSerializer<EdgeRowKey> {
 
     private static final IdRowCompositeSerializer ID_SER = IdRowCompositeSerializer.get();
@@ -39,10 +41,10 @@ public class EdgeRowKeySerializer implements CompositeFieldSerializer<EdgeRowKey
     public void toComposite( final CompositeBuilder builder, final EdgeRowKey key ) {
 
         //add the row id to the composite
+        ID_SER.toComposite( builder, key.sourceId );
+        builder.addString( key.edgeType );
         ID_SER.toComposite( builder, key.targetId );
-
-        builder.addLong( key.edgeTypesHash[0] );
-        builder.addLong( key.edgeTypesHash[1] );
+        builder.addLong( key.shardId );
     }
 
 
@@ -50,9 +52,12 @@ public class EdgeRowKeySerializer implements CompositeFieldSerializer<EdgeRowKey
     public EdgeRowKey fromComposite( final CompositeParser composite ) {
 
         final Id sourceId = ID_SER.fromComposite( composite );
+        final String edgeType = composite.readString();
+        final Id targetId = ID_SER.fromComposite( composite );
+        final long shard = composite.readLong();
 
-        final long[] hash = { composite.readLong(), composite.readLong() };
-
-        return new EdgeRowKey( sourceId, hash );
+        return new EdgeRowKey( sourceId, edgeType, targetId, shard );
     }
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72684dfc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
new file mode 100644
index 0000000..b7cc25d
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
@@ -0,0 +1,143 @@
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
+
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.usergrid.persistence.core.astyanax.ColumnParser;
+import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.netflix.astyanax.Serializer;
+import com.netflix.astyanax.model.Column;
+import com.netflix.astyanax.util.RangeBuilder;
+
+
+/**
+ * Searcher to be used when performing the search.  Performs I/O transformation as well as parsing for the iterator. If
+ * there are more row keys available to seek, the iterator will return true
+ *
+ * @param <R> The row type
+ * @param <C> The column type
+ * @param <T> The parsed return type
+ */
+public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Comparator<T>,
+        Iterator<List<ScopedRowKey<ApplicationScope, R>>> {
+
+    protected final Optional<Edge> last;
+    protected final long maxTimestamp;
+    protected final ApplicationScope scope;
+    protected final Iterator<ShardEntryGroup> shards;
+
+
+    protected EdgeSearcher( final ApplicationScope scope, final long maxTimestamp, final Optional<Edge> last,
+                            final Iterator<ShardEntryGroup> shards ) {
+
+        Preconditions.checkArgument(shards.hasNext(), "Cannot search with no possible shards");
+
+        this.scope = scope;
+        this.maxTimestamp = maxTimestamp;
+        this.last = last;
+        this.shards = shards;
+    }
+
+
+    @Override
+    public boolean hasNext() {
+        return shards.hasNext();
+    }
+
+
+    @Override
+    public List<ScopedRowKey<ApplicationScope, R>> next() {
+        Collection<Shard> readShards = shards.next().getReadShards();
+
+        List<ScopedRowKey<ApplicationScope, R>> rowKeys = new ArrayList<>(readShards.size());
+
+        for(Shard shard : readShards){
+
+            final ScopedRowKey<ApplicationScope, R> rowKey = ScopedRowKey
+                    .fromKey( scope, generateRowKey(shard.getShardIndex() ) );
+
+            rowKeys.add( rowKey );
+        }
+
+
+        return rowKeys;
+    }
+
+
+    @Override
+    public void remove() {
+        throw new UnsupportedOperationException( "Remove is unsupported" );
+    }
+
+
+    /**
+     * Set the range on a search
+     */
+    public void setRange( final RangeBuilder builder ) {
+
+        //set our start range since it was supplied to us
+        if ( last.isPresent() ) {
+            C sourceEdge = getStartColumn( last.get() );
+
+
+            builder.setStart( sourceEdge, getSerializer() );
+        }
+        else {
+
+
+        }
+    }
+
+
+    public boolean hasPage() {
+        return last.isPresent();
+    }
+
+
+    @Override
+    public T parseColumn( final Column<C> column ) {
+        final C edge = column.getName();
+
+        return createEdge( edge, column.getBooleanValue() );
+    }
+
+
+    /**
+     * Get the column's serializer
+     */
+    protected abstract Serializer<C> getSerializer();
+
+
+    /**
+     * Create a row key for this search to use
+     *
+     * @param shard The shard to use in the row key
+     */
+    protected abstract R generateRowKey( final long shard );
+
+
+    /**
+     * Set the start column to begin searching from.  The last is provided
+     */
+    protected abstract C getStartColumn( final Edge last );
+
+
+    /**
+     * Create an edge to return to the user based on the directed edge provided
+     *
+     * @param column The column name
+     * @param marked The marked flag in the column value
+     */
+    protected abstract T createEdge( final C column, final boolean marked );
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72684dfc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSerializer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSerializer.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSerializer.java
new file mode 100644
index 0000000..d93f679
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSerializer.java
@@ -0,0 +1,77 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
+
+
+import java.nio.ByteBuffer;
+
+import org.apache.usergrid.persistence.core.astyanax.IdColDynamicCompositeSerializer;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdge;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Preconditions;
+import com.netflix.astyanax.model.DynamicComposite;
+import com.netflix.astyanax.serializers.AbstractSerializer;
+import com.netflix.astyanax.serializers.LongSerializer;
+
+
+/**
+ * Serializes to a source->target edge Note that we cannot set the edge type on de-serialization.  Only the target
+ * Id and version.
+ */
+public class EdgeSerializer extends AbstractSerializer<DirectedEdge> {
+
+    private static final IdColDynamicCompositeSerializer ID_COL_SERIALIZER = IdColDynamicCompositeSerializer.get();
+    private static final LongSerializer LONG_SERIALIZER = LongSerializer.get();
+
+
+    @Override
+    public ByteBuffer toByteBuffer( final DirectedEdge edge ) {
+
+        DynamicComposite composite = new DynamicComposite();
+
+        composite.addComponent( edge.timestamp, LONG_SERIALIZER );
+
+        ID_COL_SERIALIZER.toComposite( composite, edge.id );
+
+        return composite.serialize();
+    }
+
+
+    @Override
+    public DirectedEdge fromByteBuffer( final ByteBuffer byteBuffer ) {
+        DynamicComposite composite = DynamicComposite.fromByteBuffer( byteBuffer );
+
+        Preconditions.checkArgument( composite.size() == 3, "Composite should have 3 elements" );
+
+
+        //return the version
+        final long timestamp = composite.get( 0, LONG_SERIALIZER );
+
+
+        //parse our id
+        final Id id = ID_COL_SERIALIZER.fromComposite( composite, 1 );
+
+
+        return new DirectedEdge( id, timestamp );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72684dfc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardRowKeySerializer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardRowKeySerializer.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardRowKeySerializer.java
new file mode 100644
index 0000000..0451d68
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardRowKeySerializer.java
@@ -0,0 +1,103 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
+
+
+import org.apache.usergrid.persistence.core.astyanax.CompositeFieldSerializer;
+import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeType;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.netflix.astyanax.model.CompositeBuilder;
+import com.netflix.astyanax.model.CompositeParser;
+
+
+public class EdgeShardRowKeySerializer implements CompositeFieldSerializer<DirectedEdgeMeta> {
+
+    private static final IdRowCompositeSerializer ID_SER = IdRowCompositeSerializer.get();
+
+    public static final EdgeShardRowKeySerializer INSTANCE = new EdgeShardRowKeySerializer();
+
+
+    @Override
+    public void toComposite( final CompositeBuilder builder, final DirectedEdgeMeta meta ) {
+
+
+        final DirectedEdgeMeta.NodeMeta[] nodeMeta = meta.getNodes();
+
+        //add the stored value
+        builder.addInteger( meta.getType().getStorageValue() );
+
+        final int length = nodeMeta.length;
+
+        builder.addInteger( length );
+
+
+        for ( DirectedEdgeMeta.NodeMeta node : nodeMeta ) {
+            ID_SER.toComposite( builder, node.getId() );
+            builder.addInteger( node.getNodeType().getStorageValue() );
+        }
+
+        final String[] edgeTypes = meta.getTypes();
+
+        builder.addInteger( edgeTypes.length );
+
+        for ( String type : edgeTypes ) {
+            builder.addString( type );
+        }
+    }
+
+
+    @Override
+    public DirectedEdgeMeta fromComposite( final CompositeParser composite ) {
+
+
+        final int storageType = composite.readInteger();
+
+        final DirectedEdgeMeta.MetaType metaType = DirectedEdgeMeta.MetaType.fromStorage( storageType );
+
+        final int idLength = composite.readInteger();
+
+        final DirectedEdgeMeta.NodeMeta[] nodePairs = new DirectedEdgeMeta.NodeMeta[idLength];
+
+
+        for ( int i = 0; i < idLength; i++ ) {
+            final Id sourceId = ID_SER.fromComposite( composite );
+
+            final NodeType type = NodeType.get( composite.readInteger() );
+
+            nodePairs[i] = new DirectedEdgeMeta.NodeMeta( sourceId, type );
+        }
+
+
+        final int length = composite.readInteger();
+
+        String[] types = new String[length];
+
+        for ( int i = 0; i < length; i++ ) {
+            types[i] = composite.readString();
+        }
+
+        return  DirectedEdgeMeta.fromStorage( metaType, nodePairs, types );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72684dfc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
index a08ec3b..8233d0d 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
@@ -26,19 +26,21 @@ import java.util.Iterator;
 
 import org.apache.cassandra.db.marshal.BytesType;
 
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.astyanax.CassandraConfig;
+import org.apache.usergrid.persistence.core.astyanax.ColumnNameIterator;
+import org.apache.usergrid.persistence.core.astyanax.ColumnParser;
+import org.apache.usergrid.persistence.core.astyanax.ColumnTypes;
 import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
 import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
+import org.apache.usergrid.persistence.core.astyanax.OrganizationScopedRowKeySerializer;
 import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
-import org.apache.usergrid.persistence.core.astyanax.ColumnTypes;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.util.ValidationUtils;
 import org.apache.usergrid.persistence.graph.GraphFig;
-import org.apache.usergrid.persistence.core.astyanax.CassandraConfig;
-import org.apache.usergrid.persistence.core.astyanax.OrganizationScopedRowKeySerializer;
-import org.apache.usergrid.persistence.core.astyanax.ColumnNameIterator;
-import org.apache.usergrid.persistence.core.astyanax.ColumnParser;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardSerialization;
-import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
+import org.apache.usergrid.persistence.graph.serialization.util.GraphValidation;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
@@ -58,14 +60,12 @@ public class EdgeShardSerializationImpl implements EdgeShardSerialization {
     /**
      * Edge shards
      */
-    private static final MultiTennantColumnFamily<ApplicationScope, EdgeRowKey, Long> EDGE_SHARDS =
+    private static final MultiTennantColumnFamily<ApplicationScope, DirectedEdgeMeta, Long> EDGE_SHARDS =
             new MultiTennantColumnFamily<>( "Edge_Shards",
-                    new OrganizationScopedRowKeySerializer<>( new EdgeRowKeySerializer() ), LongSerializer.get() );
-
+                    new OrganizationScopedRowKeySerializer<>( EdgeShardRowKeySerializer.INSTANCE ), LongSerializer.get() );
 
-    private static final byte HOLDER = 0x00;
 
-    private static final LongColumnParser COLUMN_PARSER = new LongColumnParser();
+    private static final ShardColumnParser COLUMN_PARSER = new ShardColumnParser();
 
 
     protected final Keyspace keyspace;
@@ -83,30 +83,39 @@ public class EdgeShardSerializationImpl implements EdgeShardSerialization {
 
 
     @Override
-    public MutationBatch writeEdgeMeta( final ApplicationScope scope, final Id nodeId, final long shard,
-                                        final String... types ) {
-
+    public MutationBatch writeShardMeta( final ApplicationScope scope,
+                                         final Shard shard,   final DirectedEdgeMeta metaData) {
 
         ValidationUtils.validateApplicationScope( scope );
-        ValidationUtils.verifyIdentity(nodeId);
-        Preconditions.checkArgument( shard > -1, "shardId must be greater than -1" );
-        Preconditions.checkNotNull( types );
+        GraphValidation.validateDirectedEdgeMeta( metaData );
+
+        Preconditions.checkNotNull( metaData, "metadata must be present" );
 
-        final EdgeRowKey key = new EdgeRowKey( nodeId, types );
+        Preconditions.checkNotNull( shard );
+        Preconditions.checkArgument( shard.getShardIndex() > -1, "shardid must be greater than -1" );
+        Preconditions.checkArgument( shard.getCreatedTime() > -1, "createdTime must be greater than -1" );
 
-        final ScopedRowKey rowKey = ScopedRowKey.fromKey( scope, key );
+        final ScopedRowKey rowKey = ScopedRowKey.fromKey( scope, metaData );
 
         final MutationBatch batch = keyspace.prepareMutationBatch();
 
-        batch.withRow( EDGE_SHARDS, rowKey ).putColumn( shard, HOLDER );
+        batch.withTimestamp( shard.getCreatedTime() ).withRow( EDGE_SHARDS, rowKey )
+             .putColumn( shard.getShardIndex(), shard.isCompacted() );
 
         return batch;
     }
 
 
     @Override
-    public Iterator<Long> getEdgeMetaData( final ApplicationScope scope, final Id nodeId, final Optional<Long> start,
-                                           final String... types ) {
+    public Iterator<Shard> getShardMetaData( final ApplicationScope scope,
+                                             final Optional<Shard> start,   final DirectedEdgeMeta metaData  ) {
+
+        ValidationUtils.validateApplicationScope( scope );
+        GraphValidation.validateDirectedEdgeMeta( metaData );
+
+
+        Preconditions.checkNotNull( metaData, "metadata must be present" );
+
         /**
          * If the edge is present, we need to being seeking from this
          */
@@ -114,15 +123,16 @@ public class EdgeShardSerializationImpl implements EdgeShardSerialization {
         final RangeBuilder rangeBuilder = new RangeBuilder().setLimit( graphFig.getScanPageSize() );
 
         if ( start.isPresent() ) {
-            rangeBuilder.setStart( start.get() );
+            final Shard shard = start.get();
+            GraphValidation.valiateShard( shard );
+            rangeBuilder.setStart( shard.getShardIndex() );
         }
 
-        final EdgeRowKey key = new EdgeRowKey( nodeId, types );
 
-        final ScopedRowKey rowKey = ScopedRowKey.fromKey( scope, key );
+        final ScopedRowKey rowKey = ScopedRowKey.fromKey( scope, metaData );
 
 
-        final RowQuery<ScopedRowKey<ApplicationScope, EdgeRowKey>, Long> query =
+        final RowQuery<ScopedRowKey<ApplicationScope, DirectedEdgeMeta>, Long> query =
                 keyspace.prepareQuery( EDGE_SHARDS ).setConsistencyLevel( cassandraConfig.getReadCL() ).getKey( rowKey )
                         .autoPaginate( true ).withColumnRange( rangeBuilder.build() );
 
@@ -132,21 +142,20 @@ public class EdgeShardSerializationImpl implements EdgeShardSerialization {
 
 
     @Override
-    public MutationBatch removeEdgeMeta( final ApplicationScope scope, final Id nodeId, final long shard,
-                                         final String... types ) {
+    public MutationBatch removeShardMeta( final ApplicationScope scope,
+                                          final Shard shard,   final DirectedEdgeMeta metaData) {
 
         ValidationUtils.validateApplicationScope( scope );
-              ValidationUtils.verifyIdentity(nodeId);
-              Preconditions.checkArgument( shard > -1, "shard must be greater than -1" );
-              Preconditions.checkNotNull( types );
+        GraphValidation.valiateShard( shard );
+        GraphValidation.validateDirectedEdgeMeta( metaData );
+
 
-        final EdgeRowKey key = new EdgeRowKey( nodeId, types );
 
-        final ScopedRowKey rowKey = ScopedRowKey.fromKey( scope, key );
+        final ScopedRowKey rowKey = ScopedRowKey.fromKey( scope, metaData );
 
         final MutationBatch batch = keyspace.prepareMutationBatch();
 
-        batch.withRow( EDGE_SHARDS, rowKey ).deleteColumn( shard );
+        batch.withRow( EDGE_SHARDS, rowKey ).deleteColumn( shard.getShardIndex() );
 
         return batch;
     }
@@ -163,11 +172,15 @@ public class EdgeShardSerializationImpl implements EdgeShardSerialization {
     }
 
 
-    private static class LongColumnParser implements ColumnParser<Long, Long> {
+
+
+
+
+    private static class ShardColumnParser implements ColumnParser<Long, Shard> {
 
         @Override
-        public Long parseColumn( final Column<Long> column ) {
-            return column.getName();
+        public Shard parseColumn( final Column<Long> column ) {
+            return new Shard( column.getName(), column.getTimestamp(), column.getBooleanValue() );
         }
     }
 }