You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2014/08/14 17:20:24 UTC

[5/6] Implemented New rolling shard algorithm. This should allow eventual shard consistency between all clients without the need for an external locking allocation system

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9d652dd/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/DirectedEdgeMeta.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/DirectedEdgeMeta.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/DirectedEdgeMeta.java
new file mode 100644
index 0000000..89e46d9
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/DirectedEdgeMeta.java
@@ -0,0 +1,405 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard;
+
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
+import org.apache.usergrid.persistence.graph.SearchByIdType;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByIdType;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+/**
+ * A bean to define directed edge meta data.  This is used to encapsulate the meta data around a source or target node,
+ * and the types used for grouping them.
+ */
+public abstract class DirectedEdgeMeta {
+
+
+    protected final NodeMeta[] nodes;
+    protected final String[] types;
+
+
+    private DirectedEdgeMeta( NodeMeta[] nodes, String[] types ) {
+        this.nodes = nodes;
+        this.types = types;
+    }
+
+
+    public NodeMeta[] getNodes() {
+        return nodes;
+    }
+
+
+    public String[] getTypes() {
+        return types;
+    }
+
+
+    /**
+     * Inner class to represent node meta dat
+     */
+    public static class NodeMeta {
+        private final Id id;
+        private final NodeType nodeType;
+
+
+        public NodeMeta( final Id id, final NodeType nodeType ) {
+            this.id = id;
+            this.nodeType = nodeType;
+        }
+
+
+        public Id getId() {
+            return id;
+        }
+
+
+        public NodeType getNodeType() {
+            return nodeType;
+        }
+    }
+
+
+    @Override
+    public boolean equals( final Object o ) {
+        if ( this == o ) {
+            return true;
+        }
+        if ( o == null || getClass() != o.getClass() ) {
+            return false;
+        }
+
+        final DirectedEdgeMeta that = ( DirectedEdgeMeta ) o;
+
+        if ( !Arrays.equals( nodes, that.nodes ) ) {
+            return false;
+        }
+        if ( !Arrays.equals( types, that.types ) ) {
+            return false;
+        }
+
+        return true;
+    }
+
+
+    @Override
+    public int hashCode() {
+        int result = Arrays.hashCode( nodes );
+        result = 31 * result + Arrays.hashCode( types );
+        return result;
+    }
+
+
+    /**
+     * Given the edge serialization, load all shard in the shard group
+     */
+    public abstract Iterator<MarkedEdge> loadEdges( final ShardedEdgeSerialization serialization,
+                                                    final EdgeColumnFamilies edgeColumnFamilies,
+                                                    final ApplicationScope scope, final ShardEntryGroup group,
+                                                    final long maxValue );
+
+    /**
+     * Get the type of this directed edge
+     */
+    public abstract MetaType getType();
+
+
+    public enum MetaType {
+        SOURCE( 0 ),
+        SOURCETARGET( 1 ),
+        TARGET( 2 ),
+        TARGETSOURCE( 3 ),
+        VERSIONS( 4 );
+
+        private final int storageValue;
+
+
+        MetaType( final int storageValue ) {this.storageValue = storageValue;}
+
+
+        public int getStorageValue() {
+            return storageValue;
+        }
+
+
+        /**
+         * Get value from storageValue
+         */
+        public static MetaType fromStorage( final int ordinal ) {
+            return mappings.get( ordinal );
+        }
+
+
+        private static Map<Integer, MetaType> mappings = new HashMap<Integer, MetaType>();
+
+
+        static {
+
+            for ( MetaType meta : MetaType.values() ) {
+                mappings.put( meta.storageValue, meta );
+            }
+        }
+    }
+
+
+    /**
+     * Created directed edge meta data from source node
+     */
+    public static DirectedEdgeMeta fromSourceNode( final Id sourceId, final String edgeType ) {
+        return fromSourceNode(
+                new DirectedEdgeMeta.NodeMeta[] { new DirectedEdgeMeta.NodeMeta( sourceId, NodeType.SOURCE ) },
+                new String[] { edgeType } );
+    }
+
+
+    /**
+     * Return meta data from the source node by edge type
+     */
+    private static DirectedEdgeMeta fromSourceNode( final NodeMeta[] nodes, final String[] types ) {
+
+        return new DirectedEdgeMeta( nodes, types ) {
+
+            @Override
+            public Iterator<MarkedEdge> loadEdges( final ShardedEdgeSerialization serialization,
+                                                   final EdgeColumnFamilies edgeColumnFamilies,
+                                                   final ApplicationScope scope, final ShardEntryGroup group,
+                                                   final long maxValue ) {
+
+                final Id sourceId = nodes[0].id;
+                final String edgeType = types[0];
+
+                final SearchByEdgeType search = new SimpleSearchByEdgeType( sourceId, edgeType, maxValue, null );
+
+                return serialization.getEdgesFromSource( edgeColumnFamilies, scope, search,
+                        Collections.singleton( group ).iterator() );
+            }
+
+
+            @Override
+            public MetaType getType() {
+                return MetaType.SOURCE;
+            }
+        };
+    }
+
+
+    /**
+     * Return meta data that represents a source node with edge type and target type
+     */
+    public static DirectedEdgeMeta fromSourceNodeTargetType( final Id sourceId, final String edgeType,
+                                                             final String targetType ) {
+        return fromSourceNodeTargetType(
+                new DirectedEdgeMeta.NodeMeta[] { new DirectedEdgeMeta.NodeMeta( sourceId, NodeType.SOURCE ) },
+                new String[] { edgeType, targetType } );
+    }
+
+
+    /**
+     * Return meta data that represents a source node with edge type and target type
+     */
+    private static DirectedEdgeMeta fromSourceNodeTargetType( NodeMeta[] nodes, String[] types ) {
+        return new DirectedEdgeMeta( nodes, types ) {
+
+            @Override
+            public Iterator<MarkedEdge> loadEdges( final ShardedEdgeSerialization serialization,
+                                                   final EdgeColumnFamilies edgeColumnFamilies,
+                                                   final ApplicationScope scope, final ShardEntryGroup group,
+                                                   final long maxValue ) {
+
+                final Id sourceId = nodes[0].id;
+                final String edgeType = types[0];
+                final String targetType = types[1];
+
+                final SearchByIdType search =
+                        new SimpleSearchByIdType( sourceId, edgeType, maxValue, targetType, null );
+
+                return serialization.getEdgesFromSourceByTargetType( edgeColumnFamilies, scope, search,
+                        Collections.singleton( group ).iterator() );
+            }
+
+
+            @Override
+            public MetaType getType() {
+                return MetaType.SOURCETARGET;
+            }
+        };
+    }
+
+
+    public static DirectedEdgeMeta fromTargetNode( final Id targetId, final String edgeType ) {
+        return fromTargetNode(
+                new DirectedEdgeMeta.NodeMeta[] { new DirectedEdgeMeta.NodeMeta( targetId, NodeType.TARGET ) },
+                new String[] { edgeType } );
+    }
+
+
+    /**
+     * Return meta data that represents from a target node by edge type
+     */
+    private static DirectedEdgeMeta fromTargetNode( final NodeMeta[] nodes, final String[] types ) {
+        return new DirectedEdgeMeta( nodes, types ) {
+
+            @Override
+            public Iterator<MarkedEdge> loadEdges( final ShardedEdgeSerialization serialization,
+                                                   final EdgeColumnFamilies edgeColumnFamilies,
+                                                   final ApplicationScope scope, final ShardEntryGroup group,
+                                                   final long maxValue ) {
+
+
+                final Id targetId = nodes[0].id;
+                final String edgeType = types[0];
+
+                final SearchByEdgeType search = new SimpleSearchByEdgeType( targetId, edgeType, maxValue, null );
+
+                return serialization.getEdgesToTarget( edgeColumnFamilies, scope, search,
+                        Collections.singleton( group ).iterator() );
+            }
+
+
+            @Override
+            public MetaType getType() {
+                return MetaType.TARGET;
+            }
+        };
+    }
+
+
+    public static DirectedEdgeMeta fromTargetNodeSourceType( final Id targetId, final String edgeType,
+                                                             final String sourceType ) {
+        return fromTargetNodeSourceType(
+                new DirectedEdgeMeta.NodeMeta[] { new DirectedEdgeMeta.NodeMeta( targetId, NodeType.TARGET ) },
+                new String[] { edgeType, sourceType } );
+    }
+
+
+    /**
+     * Return meta data that represents a target node and a source node type
+     */
+    private static DirectedEdgeMeta fromTargetNodeSourceType( final NodeMeta[] nodes, final String[] types ) {
+        return new DirectedEdgeMeta( nodes, types ) {
+
+            @Override
+            public Iterator<MarkedEdge> loadEdges( final ShardedEdgeSerialization serialization,
+                                                   final EdgeColumnFamilies edgeColumnFamilies,
+                                                   final ApplicationScope scope, final ShardEntryGroup group,
+                                                   final long maxValue ) {
+
+                final Id targetId = nodes[0].id;
+                final String edgeType = types[0];
+                final String sourceType = types[1];
+
+
+                final SearchByIdType search =
+                        new SimpleSearchByIdType( targetId, edgeType, maxValue, sourceType, null );
+
+                return serialization.getEdgesToTargetBySourceType( edgeColumnFamilies, scope, search,
+                        Collections.singleton( group ).iterator() );
+            }
+
+
+            @Override
+            public MetaType getType() {
+                return MetaType.TARGETSOURCE;
+            }
+        };
+    }
+
+
+    /**
+     * Return meta data that represents an entire edge
+     */
+    public static DirectedEdgeMeta fromEdge( final Id sourceId, final Id targetId, final String edgeType ) {
+        return fromEdge( new DirectedEdgeMeta.NodeMeta[] {
+                new DirectedEdgeMeta.NodeMeta( sourceId, NodeType.SOURCE ),
+                new DirectedEdgeMeta.NodeMeta( targetId, NodeType.TARGET )
+        }, new String[] { edgeType } );
+    }
+
+
+    /**
+     * Return meta data that represents an entire edge
+     */
+    private static DirectedEdgeMeta fromEdge( final NodeMeta[] nodes, final String[] types ) {
+        return new DirectedEdgeMeta( nodes, types ) {
+
+            @Override
+            public Iterator<MarkedEdge> loadEdges( final ShardedEdgeSerialization serialization,
+                                                   final EdgeColumnFamilies edgeColumnFamilies,
+                                                   final ApplicationScope scope, final ShardEntryGroup group,
+                                                   final long maxValue ) {
+
+                final Id sourceId = nodes[0].id;
+                final Id targetId = nodes[1].id;
+                final String edgeType = types[0];
+
+                final SimpleSearchByEdge search =
+                        new SimpleSearchByEdge( sourceId, edgeType, targetId, maxValue, null );
+
+                return serialization.getEdgeVersions( edgeColumnFamilies, scope, search,
+                        Collections.singleton( group ).iterator() );
+            }
+
+
+            @Override
+            public MetaType getType() {
+                return MetaType.VERSIONS;
+            }
+        };
+    }
+
+
+    /**
+     * Create a directed edge from the stored meta data
+     * @param metaType The meta type stored
+     * @param nodes The metadata of the nodes
+     * @param types The types in the meta data
+     *
+     *
+     */
+    public static DirectedEdgeMeta fromStorage( final  MetaType metaType, final NodeMeta[] nodes, final String[] types ) {
+        switch ( metaType ) {
+            case SOURCE:
+                return fromSourceNode( nodes, types );
+            case SOURCETARGET:
+                return fromSourceNodeTargetType( nodes, types );
+            case TARGET:
+                return fromTargetNode( nodes, types );
+            case TARGETSOURCE:
+                return fromTargetNodeSourceType( nodes, types );
+            case VERSIONS:
+                return fromEdge(nodes, types);
+            default:
+                throw new UnsupportedOperationException( "No supported meta type found" );
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9d652dd/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeColumnFamilies.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeColumnFamilies.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeColumnFamilies.java
new file mode 100644
index 0000000..6f6c72d
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeColumnFamilies.java
@@ -0,0 +1,60 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+package org.apache.usergrid.persistence.graph.serialization.impl.shard;
+
+
+import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
+import org.apache.usergrid.persistence.core.migration.Migration;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+
+
+/**
+ * Implementation for using multiple column families
+ */
+public interface EdgeColumnFamilies extends Migration{
+
+    /**
+     * Get the name of the column family for getting source nodes
+     */
+    public MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> getSourceNodeCfName();
+
+    /**
+     * Get the name of the column family for getting target nodes
+     */
+    public MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> getTargetNodeCfName();
+
+
+    /**
+     * Get the name of the column family for getting source nodes  with a target type
+     */
+    public MultiTennantColumnFamily<ApplicationScope, RowKeyType, DirectedEdge> getSourceNodeTargetTypeCfName();
+
+    /**
+     * Get the name of the column family for getting target nodes with a source type
+     */
+    public MultiTennantColumnFamily<ApplicationScope, RowKeyType, DirectedEdge> getTargetNodeSourceTypeCfName();
+
+    /**
+     * Get the Graph edge versions cf
+     * @return
+     */
+    public MultiTennantColumnFamily<ApplicationScope, EdgeRowKey, Long> getGraphEdgeVersions();
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9d652dd/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeRowKey.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeRowKey.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeRowKey.java
new file mode 100644
index 0000000..d7982d6
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeRowKey.java
@@ -0,0 +1,44 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard;
+
+
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+/**
+ * Used to store row keys by sourceId, targetId and edgeType
+ */
+public class EdgeRowKey {
+    public final Id sourceId;
+    public final Id targetId;
+    public final String edgeType;
+    public final long shardId;
+
+
+    public EdgeRowKey( final Id sourceId, final String edgeType, final Id targetId, final long shardId ) {
+        this.sourceId = sourceId;
+        this.targetId = targetId;
+        this.edgeType = edgeType;
+        this.shardId = shardId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9d652dd/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerialization.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerialization.java
index d49cfdf..81dcb39 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerialization.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerialization.java
@@ -38,31 +38,28 @@ public interface EdgeShardSerialization extends Migration{
     /**
      * Write a new time shard for the meta data
      * @param scope The scope to write
-     * @param nodeId The id in the edge
-     * @param shard The next time to write
-     * @param types The types to write to.  Can be edge type, or edgeType+id type
+     * @param shard The shard to write
+     * @param directedEdgeMeta The edge meta data to use
      */
-    public MutationBatch writeEdgeMeta(ApplicationScope scope, Id nodeId, long shard,  String... types);
+    public MutationBatch writeShardMeta( ApplicationScope scope, Shard shard, DirectedEdgeMeta directedEdgeMeta );
 
     /**
      * Get an iterator of all meta data and types.  Returns a range from High to low
      * @param scope The organization scope
-     * @param nodeId The id of the node
      * @param start The shard time to start seeking from.  Values <= this value will be returned.
-     * @param types The types to use
+     * @param directedEdgeMeta The edge meta data to use
      * @return
      */
-    public Iterator<Long> getEdgeMetaData(ApplicationScope scope, Id nodeId, Optional<Long> start,  String... types);
+    public Iterator<Shard> getShardMetaData( ApplicationScope scope, Optional<Shard> start,  DirectedEdgeMeta directedEdgeMeta);
 
     /**
      * Remove the shard from the edge meta data from the types.
 
-     * @param scope
-     * @param nodeId
-     * @param shard
-     * @param types
+     * @param scope The scope of the application
+     * @param shard The shard to remove
+     * @param directedEdgeMeta The edge meta data to use
      * @return
      */
-    public MutationBatch removeEdgeMeta(ApplicationScope scope, Id nodeId, long shard, String... types);
+    public MutationBatch removeShardMeta( ApplicationScope scope, Shard shard,  DirectedEdgeMeta directedEdgeMeta );
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9d652dd/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardStrategy.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardStrategy.java
index 09436ac..10f3794 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardStrategy.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardStrategy.java
@@ -21,7 +21,6 @@ package org.apache.usergrid.persistence.graph.serialization.impl.shard;
 
 
 import java.util.Iterator;
-import java.util.UUID;
 
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.model.entity.Id;
@@ -32,62 +31,31 @@ public interface EdgeShardStrategy {
     /**
      * Get the shard key used for writing this shard.  CUD operations should use this
      *
-     * @param scope The application's scope
-     * @param rowKeyId The id being used in the row key
+     * @param scope The application's scope]
      * @param timestamp The timestamp on the edge
-     * @param types The types in the edge
      */
-    public long getWriteShard(final ApplicationScope scope, final Id rowKeyId, final  long timestamp, final String... types );
+    public ShardEntryGroup getWriteShards( final ApplicationScope scope, final long timestamp, final DirectedEdgeMeta directedEdgeMeta );
 
 
     /**
      * Get the iterator of all shards for this entity
      *
      * @param scope The application scope
-     * @param rowKeyId The id used in the row key
      * @param maxTimestamp The max timestamp to use
-     * @param types the types in the edge
      */
-    public Iterator<Long> getReadShards(final ApplicationScope scope,final  Id rowKeyId, final long maxTimestamp,final  String... types );
+    public Iterator<ShardEntryGroup> getReadShards(final ApplicationScope scope, final long maxTimestamp, final DirectedEdgeMeta directedEdgeMeta );
 
     /**
      * Increment our count meta data by the passed value.  Can be a positive or a negative number.
      * @param scope The scope in the application
-     * @param rowKeyId The row key id
-     * @param shardId The shard id to use
+     * @param shard The shard to use
      * @param count The amount to increment or decrement
-     * @param types The types
+     * @param directedEdgeMeta The edge meta data to use
      * @return
      */
-    public void increment(final ApplicationScope scope,final  Id rowKeyId, long shardId, long count ,final  String... types );
+    public void increment(final ApplicationScope scope, Shard shard, long count, final DirectedEdgeMeta directedEdgeMeta );
 
 
-    /**
-     * Get the name of the column family for getting source nodes
-     */
-    public String getSourceNodeCfName();
-
-    /**
-     * Get the name of the column family for getting target nodes
-     */
-    public String getTargetNodeCfName();
-
-
-    /**
-     * Get the name of the column family for getting source nodes  with a target type
-     */
-    public String getSourceNodeTargetTypeCfName();
-
-    /**
-     * Get the name of the column family for getting target nodes with a source type
-     */
-    public String getTargetNodeSourceTypeCfName();
-
-    /**
-     * Get the Graph edge versions cf
-     * @return
-     */
-    public String getGraphEdgeVersions();
 
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9d652dd/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocation.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocation.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocation.java
index 1097ced..75bdc74 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocation.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocation.java
@@ -36,27 +36,31 @@ public interface NodeShardAllocation {
 
 
     /**
-     * Get all shards for the given info.  If none exist, a default shard should be allocated
+     * Get all shards for the given info.  If none exist, a default shard should be allocated.  The nodeId is the source node
      *
      * @param scope The application scope
-     * @param nodeId
      * @param maxShardId The max value to start seeking from.  Values <= this will be returned if specified
-     * @param edgeTypes
+     * @param directedEdgeMeta The directed edge metadata to use
      * @return A list of all shards <= the current shard.  This will always return 0l if no shards are allocated
      */
-    public Iterator<Long> getShards( final ApplicationScope scope, final Id nodeId, Optional<Long> maxShardId,
-                                     final String... edgeTypes );
+    public Iterator<ShardEntryGroup> getShards( final ApplicationScope scope, Optional<Shard> maxShardId, final DirectedEdgeMeta directedEdgeMeta );
 
 
     /**
      * Audit our highest shard for it's maximum capacity.  If it has reached the max capacity <=, it will allocate a new shard
      *
      * @param scope The app scope
-     * @param nodeId The node id
-     * @param edgeType The edge types
+     * @param shardEntryGroup The shard operator to use
+     * @param directedEdgeMeta The directed edge metadata to use
      * @return True if a new shard was allocated
      */
-    public boolean auditMaxShard(final ApplicationScope scope, final Id nodeId, final String... edgeType);
+    public boolean auditShard(final ApplicationScope scope, final ShardEntryGroup shardEntryGroup, final DirectedEdgeMeta directedEdgeMeta);
+
+    /**
+     * Get the minimum time that a created shard should be considered "new", and be used for both new writes and reads
+     * @return
+     */
+    public long getMinTime();
 
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9d652dd/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardApproximation.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardApproximation.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardApproximation.java
index 90503d4..dbe645f 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardApproximation.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardApproximation.java
@@ -34,24 +34,34 @@ public interface NodeShardApproximation {
      * Increment the shard Id the specified amount
      *
      * @param scope The scope
-     * @param nodeId The node id
-     * @param shardId The shard id
-     * @param count
-     * @param edgeType The edge type
+     * @param shard The shard to use
+     * @param count The count to increment
+     * @param directedEdgeMeta The directed edge meta data to use
      */
-    public void increment( final ApplicationScope scope, final Id nodeId, final long shardId,  final long count,
-                           final String... edgeType );
+    public void increment( final ApplicationScope scope, final Shard shard,
+                           final long count, final DirectedEdgeMeta directedEdgeMeta );
 
 
     /**
      * Get the approximation of the number of unique items
+     *
+     * @param scope The scope
+     * @param directedEdgeMeta The directed edge meta data to use
      */
-    public long getCount( final ApplicationScope scope, final Id nodeId, final long shardId,
-                          final String... edgeType );
+    public long getCount( final ApplicationScope scope, final Shard shard,  final DirectedEdgeMeta directedEdgeMeta );
 
 
     /**
-     * Flush the current counters in the Approximation
+     * Flush the current counters in the Approximation.  Will return immediately after the flush. You can then use flushPending
+     * to check the state.
      */
-    public void flush();
+    public void beginFlush();
+
+    /**
+     * Return true if there is data to be flushed
+     * @return
+     */
+    public boolean flushPending();
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9d652dd/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCache.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCache.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCache.java
index 667fdbf..58924af 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCache.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCache.java
@@ -20,7 +20,6 @@ package org.apache.usergrid.persistence.graph.serialization.impl.shard;
 
 
 import java.util.Iterator;
-import java.util.UUID;
 
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.model.entity.Id;
@@ -35,21 +34,22 @@ public interface NodeShardCache {
 
 
     /**
-     * Get the time meta data for the given node
-     * @param nodeId
+     * Get the shard for the given timestamp
+     * @param scope The scope for the application
      * @param timestamp The time to select the slice for.
-     * @param edgeType
+     * @param directedEdgeMeta The directed edge meta data
      */
-    public long getSlice(final ApplicationScope scope, final Id nodeId, final long timestamp, final String... edgeType);
+    public ShardEntryGroup getWriteShardGroup( final ApplicationScope scope,
+                                               final long timestamp, final DirectedEdgeMeta directedEdgeMeta );
 
     /**
-     * Get an iterator of all versions <= the version
-     * @param scope
-     * @param nodeId
+     * Get an iterator of all versions <= the version for iterating shard entry sets.  The iterator of groups will be ordered
+     * highest to lowest.  I.E range scanning from Long.MAX_VALUE to 0
+     * @param scope The scope for the application
      * @param maxTimestamp The highest timestamp
-     * @param edgeType
+     * @param directedEdgeMeta The directed edge meta data
      * @return
      */
-    public Iterator<Long> getVersions(final ApplicationScope scope, final Id nodeId, final long  maxTimestamp, final String... edgeType);
+    public Iterator<ShardEntryGroup> getReadShardGroup( final ApplicationScope scope, final long maxTimestamp, final DirectedEdgeMeta directedEdgeMeta  );
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9d652dd/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeType.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeType.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeType.java
new file mode 100644
index 0000000..62c2f11
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeType.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.graph.serialization.impl.shard;
+
+
+import java.util.HashMap;
+
+
+/**
+ * The node type of the source or target
+ */
+public enum NodeType {
+    SOURCE( 0 ),
+    TARGET( 1 );
+
+    private final int ordinal;
+
+
+    private NodeType( final int ordinal ) {this.ordinal = ordinal;}
+
+
+    public int getStorageValue() {
+        return ordinal;
+    }
+
+
+    /**
+     * Get the type from the storageValue value
+     * @param storageValue
+     * @return
+     */
+    public static NodeType get(final int storageValue){
+     return types.get( storageValue );
+    }
+
+
+    /**
+     * Internal map and initialization for fast access
+     */
+    private static final HashMap<Integer, NodeType> types;
+
+
+    static{
+        types = new HashMap<>();
+
+        for(NodeType type: NodeType.values()){
+            types.put( type.getStorageValue(), type );
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9d652dd/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/RowKey.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/RowKey.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/RowKey.java
new file mode 100644
index 0000000..9895978
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/RowKey.java
@@ -0,0 +1,54 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard;
+
+
+import org.apache.usergrid.persistence.graph.serialization.util.EdgeHasher;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+/**
+ * Class that represents an edge row key
+ */
+public class RowKey {
+    public final Id nodeId;
+    public final long[] hash;
+    public final long shardId;
+
+
+    /**
+     * Create a row key with the node and the edgeType
+     */
+    public RowKey( Id nodeId, String edgeType, final long shardId ) {
+        this( nodeId, EdgeHasher.createEdgeHash( edgeType ), shardId );
+    }
+
+
+    /**
+     * Create a new row key with the hash, should only be used in deserialization or internal callers.
+     */
+    public RowKey( Id nodeId, long[] hash, final long shardId ) {
+        this.nodeId = nodeId;
+        this.hash = hash;
+        this.shardId = shardId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9d652dd/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/RowKeyType.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/RowKeyType.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/RowKeyType.java
new file mode 100644
index 0000000..5705eb3
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/RowKeyType.java
@@ -0,0 +1,60 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard;
+
+
+import org.apache.usergrid.persistence.graph.serialization.util.EdgeHasher;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+/**
+ * The row key with the additional type
+ */
+public class RowKeyType extends RowKey {
+
+    /**
+     * Create a row key with the node id in the row key, the edge type, and the type from the typeid
+     *
+     * @param nodeId The node id in the row key
+     * @param edgeType The type of the edge
+     * @param typeId The type of hte id
+     */
+    public RowKeyType( final Id nodeId, final String edgeType, final Id typeId, final long shardId ) {
+        this( nodeId, edgeType, typeId.getType(), shardId );
+    }
+
+
+    /**
+     * Create a row key with the node id in the row key, the edge type, adn the target type from the id
+     */
+    public RowKeyType( final Id nodeId, final String edgeType, final String targetType, final long shardId ) {
+        super( nodeId, EdgeHasher.createEdgeHash( edgeType, targetType ), shardId );
+    }
+
+
+    /**
+     * Internal use in de-serializing.  Should only be used in this case or by internal callers
+     */
+    public RowKeyType( final Id nodeId, final long[] hash, final long shardId ) {
+        super( nodeId, hash, shardId );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9d652dd/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java
new file mode 100644
index 0000000..38fe51c
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.graph.serialization.impl.shard;
+
+
+public class Shard implements Comparable<Shard> {
+
+    private final long shardIndex;
+    private final long createdTime;
+    private final boolean compacted;
+
+
+    public Shard( final long shardIndex, final long createdTime, final boolean compacted ) {
+        this.shardIndex = shardIndex;
+        this.createdTime = createdTime;
+        this.compacted = compacted;
+    }
+
+
+    /**
+     * Get the long shard index
+     */
+    public long getShardIndex() {
+        return shardIndex;
+    }
+
+
+    /**
+     * Get the timestamp in epoch millis this shard was created
+     */
+    public long getCreatedTime() {
+        return createdTime;
+    }
+
+
+    /**
+     * Return true if this shard has been compacted
+     */
+    public boolean isCompacted() {
+        return compacted;
+    }
+
+
+    /**
+     * Compare the shards based on the timestamp first, then the created time second
+     */
+    @Override
+    public int compareTo( final Shard o ) {
+        if ( o == null ) {
+            return 1;
+        }
+
+        if ( shardIndex > o.shardIndex ) {
+            return 1;
+        }
+
+        else if ( shardIndex == o.shardIndex ) {
+            if ( createdTime > o.createdTime ) {
+                return 1;
+            }
+            else if ( createdTime < o.createdTime ) {
+                return -1;
+            }
+
+            else {
+
+                //kind of arbitrary compacted takes precedence
+                if ( compacted && !o.compacted ) {
+                    return 1;
+                }
+
+                else if ( !compacted && o.compacted ){
+                    return -1;
+                }
+
+
+            }
+            return 0;
+        }
+
+        return -1;
+    }
+
+
+    @Override
+    public boolean equals( final Object o ) {
+        if ( this == o ) {
+            return true;
+        }
+        if ( o == null || getClass() != o.getClass() ) {
+            return false;
+        }
+
+        final Shard shard = ( Shard ) o;
+
+        if ( compacted != shard.compacted ) {
+            return false;
+        }
+        if ( createdTime != shard.createdTime ) {
+            return false;
+        }
+        if ( shardIndex != shard.shardIndex ) {
+            return false;
+        }
+
+        return true;
+    }
+
+
+    @Override
+    public int hashCode() {
+        int result = ( int ) ( shardIndex ^ ( shardIndex >>> 32 ) );
+        result = 31 * result + ( int ) ( createdTime ^ ( createdTime >>> 32 ) );
+        result = 31 * result + ( compacted ? 1 : 0 );
+        return result;
+    }
+
+
+    @Override
+    public String toString() {
+        return "Shard{" +
+                "shardIndex=" + shardIndex +
+                ", createdTime=" + createdTime +
+                "} ";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9d652dd/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
new file mode 100644
index 0000000..6e82cbc
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.graph.serialization.impl.shard;
+
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.TreeSet;
+
+import com.google.common.base.Preconditions;
+
+
+/**
+ * There are cases where we need to read or write to more than 1 shard.  This object encapsulates a set of shards that
+ * should be written to and read from.  All reads should combine the data sets from all shards in the group, and writes
+ * should be written to each shard.  Once the shard can safely be compacted a background process should be triggered to
+ * remove additional shards and make seeks faster.  This multiread/write should only occur during the time period of the
+ * delta (in milliseconds), after which the next read will asynchronously compact the shards into a single shard.
+ */
+public class ShardEntryGroup {
+
+
+    private List<Shard> shards;
+
+    private final long delta;
+
+    private long maxCreatedTime;
+
+    private Shard compactionTarget;
+
+
+    /**
+     * The max delta we accept in milliseconds for create time to be considered a member of this group
+     */
+    public ShardEntryGroup( final long delta ) {
+        Preconditions.checkArgument( delta > 0, "delta must be greater than 0" );
+        this.delta = delta;
+        this.shards = new ArrayList<>();
+        this.maxCreatedTime = 0;
+    }
+
+
+    /**
+     * Only add a shard if it is within the rules require to meet a group.  The rules are outlined below.
+     *
+     * Case 1)  First shard in the group, always added
+     *
+     * Case 2) Shard is unmerged, it should be included with it's peers since other nodes may not have it yet
+     *
+     * Case 3) The list contains only non compacted shards, and this is last and and merged.  It is considered a lower
+     * bound
+     */
+    public boolean addShard( final Shard shard ) {
+
+        Preconditions.checkNotNull( "shard cannot be null", shard );
+
+        final int size = shards.size();
+
+        if ( size == 0 ) {
+            addShardInternal( shard );
+            return true;
+        }
+
+        final Shard minShard = shards.get( size - 1 );
+
+        Preconditions.checkArgument( minShard.compareTo( shard ) > 0, "shard must be less than the current max" );
+
+        //shard is not compacted, or it's predecessor isn't, we should include it in this group
+        if ( !minShard.isCompacted() ) {
+            addShardInternal( shard );
+            return true;
+        }
+
+
+        return false;
+    }
+
+
+    /**
+     * Add the shard and set the min created time
+     */
+    private void addShardInternal( final Shard shard ) {
+        shards.add( shard );
+
+        maxCreatedTime = Math.max( maxCreatedTime, shard.getCreatedTime() );
+
+        //we're changing our structure, unset the compaction target
+        compactionTarget = null;
+    }
+
+
+    /**
+     * Return the minum shard based on time indexes
+     */
+    public Shard getMinShard() {
+        final int size = shards.size();
+
+        if ( size < 1 ) {
+            return null;
+        }
+
+        return shards.get( size - 1 );
+    }
+
+
+    /**
+     * Get the entries that we should read from.
+     */
+    public Collection<Shard> getReadShards() {
+        return shards;
+    }
+
+
+    /**
+     * Get the entries, with the max shard time being first. We write to all shards until they're migrated
+     */
+    public Collection<Shard> getWriteShards( long currentTime ) {
+
+        /**
+         * The shards in this set can be combined, we should only write to the compaction target to avoid
+         * adding data to other shards
+         */
+        if ( !isTooSmallToCompact() && shouldCompact( currentTime ) ) {
+            return Collections.singleton( getCompactionTarget() );
+        }
+
+
+        return shards;
+    }
+
+
+    /**
+     * Return true if we have a pending compaction
+     */
+    public boolean isCompactionPending() {
+        return !isTooSmallToCompact();
+    }
+
+
+    /**
+     * Get the shard all compactions should write to.  Null indicates we cannot find a shard that could be used as a
+     * compaction target.  Note that this shard may not have surpassed the delta yet You should invoke "shouldCompact"
+     * first to ensure all criteria are met before initiating compaction
+     */
+    public Shard getCompactionTarget() {
+
+        if ( compactionTarget != null ) {
+            return compactionTarget;
+        }
+
+
+        //we have < 2 shards, we can't compact
+        if ( isTooSmallToCompact() ) {
+            return null;
+        }
+
+
+        final int lastIndex = shards.size() - 1;
+
+        final Shard last = shards.get( lastIndex );
+
+        //Our oldest isn't compacted. As a result we have no "bookend" to delimit this entry group.  Therefore we
+        // can't compact
+        if ( !last.isCompacted() ) {
+            return null;
+        }
+
+        //Start seeking from the end of our group.  The first shard we encounter that is not compacted is our
+        // compaction target
+        //NOTE: This does not mean we can compact, rather it's just an indication that we have a target set.
+        for ( int i = lastIndex - 1; i > -1; i-- ) {
+            final Shard compactionCandidate = shards.get( i );
+
+
+            if ( !compactionCandidate.isCompacted() ) {
+                compactionTarget = compactionCandidate;
+                break;
+            }
+        }
+
+        return compactionTarget;
+    }
+
+
+    /**
+     * Return the number of entries in this shard group
+     */
+    public int entrySize() {
+        return shards.size();
+    }
+
+
+    /**
+     * Return true if there are not enough elements in this entry group to consider compaction
+     */
+    private boolean isTooSmallToCompact() {
+        return shards.size() < 2;
+    }
+
+
+    /**
+     * Returns true if the newest created shard is path the currentTime - delta
+     *
+     * @param currentTime The current system time in milliseconds
+     *
+     * @return True if these shards can safely be combined into a single shard, false otherwise
+     */
+    public boolean shouldCompact( final long currentTime ) {
+
+        /**
+         * We don't have enough shards to compact, ignore
+         */
+        return getCompactionTarget() != null
+
+
+                /**
+                 * If something was created within the delta time frame, not everyone may have seen it due to
+                 * cache refresh, we can't compact yet.
+                 */
+
+                && currentTime - delta > maxCreatedTime;
+    }
+
+
+    /**
+     * Return true if this shard can be deleted AFTER all of the data in it has been moved
+     */
+    public boolean canBeDeleted( final Shard shard ) {
+        //if we're a neighbor shard (n-1) or the target compaction shard, we can't be deleted
+        //we purposefully use shard index comparison over .equals here, since 2 shards might have the same index with
+        // different timestamps
+        // (unlikely but could happen)
+
+        final Shard compactionTarget = getCompactionTarget();
+
+
+        return !shard.isCompacted() && ( compactionTarget != null && compactionTarget.getShardIndex() != shard
+                .getShardIndex() );
+    }
+
+
+    /**
+     * Helper method to create a shard entry group with a single shard
+     */
+    public static ShardEntryGroup singletonGroup( final Shard shard, final long delta ) {
+        ShardEntryGroup group = new ShardEntryGroup( delta );
+        group.addShard( shard );
+
+        return group;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9d652dd/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
new file mode 100644
index 0000000..13c5596
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
@@ -0,0 +1,43 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard;
+
+
+import rx.Observable;
+
+
+/**
+ * Defines tasks for running compaction
+ *
+ *
+ */
+public interface ShardGroupCompaction {
+
+
+    /**
+     * Execute the compaction task.  Will return the number of edges that have
+     * @param group The shard entry group to compact
+     * @return The number of edges that are now compacted into the target shard
+     */
+    public Observable<Integer> compact(ShardEntryGroup group);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9d652dd/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardOperator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardOperator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardOperator.java
new file mode 100644
index 0000000..895c6d6
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardOperator.java
@@ -0,0 +1,43 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard;
+
+
+import java.util.Iterator;
+
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+
+
+/**
+ *
+ * Strategy for performing shard operations based on their shard types
+ *
+ */
+public interface ShardOperator {
+
+     /**
+     * Get the edges for this operator.  Search
+     * @return
+     */
+    public Iterator<MarkedEdge> getEdges(final ApplicationScope scope, final Shard shard, final long maxValue);
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9d652dd/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardedEdgeSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardedEdgeSerialization.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardedEdgeSerialization.java
new file mode 100644
index 0000000..d1ad18d
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardedEdgeSerialization.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.graph.serialization.impl.shard;
+
+
+import java.util.Iterator;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.SearchByEdge;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
+import org.apache.usergrid.persistence.graph.SearchByIdType;
+
+import com.netflix.astyanax.MutationBatch;
+
+
+/**
+ * Performs serialization on the shards
+ */
+public interface ShardedEdgeSerialization {
+
+    /**
+     * EdgeWrite both the source--->Target edge and the target <----- source edge into the mutation
+     *
+     * @param columnFamilies The column families to use
+     * @param scope The org scope of the graph
+     * @param markedEdge The edge to write
+     * @param timestamp The timestamp to use
+     */
+    MutationBatch writeEdge( EdgeColumnFamilies columnFamilies, ApplicationScope scope, MarkedEdge markedEdge,
+                             UUID timestamp );
+
+    /**
+     * EdgeWrite both the source -->target edge and the target<--- source edge into the mutation
+     *
+     * @param columnFamilies The column families to use
+     * @param scope The org scope of the graph
+     * @param markedEdge The edge to write
+     * @param timestamp The timestamp of the uuid
+     */
+    MutationBatch deleteEdge( EdgeColumnFamilies columnFamilies, ApplicationScope scope, MarkedEdge markedEdge,
+                              UUID timestamp );
+
+
+    /**
+     * Search for all versions of this edge < the search version.  Returns all versions
+     *
+     * @param columnFamilies The column families to use
+     * @param scope The application scope
+     * @param search The search criteria
+     * @param shards The shards to iterate when searching
+     */
+    Iterator<MarkedEdge> getEdgeVersions( EdgeColumnFamilies columnFamilies, ApplicationScope scope,
+                                          SearchByEdge search, Iterator<ShardEntryGroup> shards );
+
+    /**
+     * Get an iterator of all edges by edge type originating from source node
+     *
+     * @param columnFamilies The column families to use
+     * @param scope The application scope
+     * @param search The search criteria
+     * @param shards The shards to iterate when searching
+     */
+    Iterator<MarkedEdge> getEdgesFromSource( EdgeColumnFamilies columnFamilies, ApplicationScope scope,
+                                             SearchByEdgeType search, Iterator<ShardEntryGroup> shards );
+
+
+    /**
+     * Get an iterator of all edges by edge type originating from source node.  Also filters by target node id type
+     *
+     * @param columnFamilies The column families to use
+     * @param scope The application scope
+     * @param search The search criteria
+     * @param shards The shards to iterate when searching
+     */
+    Iterator<MarkedEdge> getEdgesFromSourceByTargetType( EdgeColumnFamilies columnFamilies, ApplicationScope scope,
+                                                         SearchByIdType search, Iterator<ShardEntryGroup> shards );
+
+    /**
+     * Get an iterator of all edges by edge type pointing to the target node.  Returns all versions
+     *
+     * @param columnFamilies The column families to use
+     * @param scope The application scope
+     * @param search The search criteria
+     * @param shards The shards to iterate when searching
+     */
+    Iterator<MarkedEdge> getEdgesToTarget( EdgeColumnFamilies columnFamilies, ApplicationScope scope,
+                                           SearchByEdgeType search, Iterator<ShardEntryGroup> shards );
+
+
+    /**
+     * Get an iterator of all edges by edge type pointing to the target node.  Also uses the source id type to limit the
+     * results
+     *
+     * @param columnFamilies The column families to use
+     * @param scope The application scope
+     * @param search The search criteria
+     * @param shards The shards to iterate when searching
+     */
+    Iterator<MarkedEdge> getEdgesToTargetBySourceType( EdgeColumnFamilies columnFamilies, ApplicationScope scope,
+                                                       SearchByIdType search, Iterator<ShardEntryGroup> shards );
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9d652dd/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/Counter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/Counter.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/Counter.java
index 4318200..f5666a2 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/Counter.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/Counter.java
@@ -24,6 +24,8 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 
+import com.google.common.base.Preconditions;
+
 
 /**
  * This class is synchronized for addition.  It is meant to be used across multiple threads
@@ -35,7 +37,7 @@ public class Counter {
     private final AtomicLong invokeCounter;
 
     /**
-     * Pointer to our "current" counter map.  We flush this when time expires or we hit our count
+     * Pointer to our "current" counter map.  We beginFlush this when time expires or we hit our count
      */
     private final ConcurrentHashMap<ShardKey, AtomicLong> counts;
 
@@ -94,6 +96,10 @@ public class Counter {
      * @param other
      */
     public void merge(final Counter other){
+
+        Preconditions.checkNotNull(other, "other cannot be null");
+        Preconditions.checkNotNull( other.counts, "other.counts cannot be null" );
+
         for(Map.Entry<ShardKey, AtomicLong> entry: other.counts.entrySet()){
             add(entry.getKey(), entry.getValue().get());
         }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9d652dd/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java
index 7dc763f..b4d88d5 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java
@@ -20,15 +20,23 @@
 package org.apache.usergrid.persistence.graph.serialization.impl.shard.count;
 
 
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import javax.inject.Inject;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.usergrid.persistence.core.consistency.TimeService;
 import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.GraphFig;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardApproximation;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeType;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
 import org.apache.usergrid.persistence.model.entity.Id;
 
 import com.netflix.astyanax.MutationBatch;
@@ -40,10 +48,12 @@ import rx.schedulers.Schedulers;
 
 /**
  * Implementation for doing edge approximation based on counters.  Uses a guava loading cache to load values from
- * cassandra, and flush them on cache eviction.
+ * cassandra, and beginFlush them on cache eviction.
  */
 public class NodeShardApproximationImpl implements NodeShardApproximation {
 
+    private static final Logger LOG = LoggerFactory.getLogger(NodeShardApproximationImpl.class);
+
     /**
      * Read write locks to ensure we atomically swap correctly
      */
@@ -63,7 +73,9 @@ public class NodeShardApproximationImpl implements NodeShardApproximation {
     /**
      * The counter that is currently in process of flushing to Cassandra.  Can be null
      */
-    private volatile Counter flushPending;
+    private final BlockingQueue<Counter> flushQueue;
+
+    private final FlushWorker worker;
 
 
     /**
@@ -77,15 +89,22 @@ public class NodeShardApproximationImpl implements NodeShardApproximation {
         this.nodeShardCounterSerialization = nodeShardCounterSerialization;
         this.timeService = timeService;
         this.currentCounter = new Counter();
+        this.flushQueue = new LinkedBlockingQueue<>( graphFig.getCounterFlushQueueSize() );
+
+        this.worker = new FlushWorker( this.flushQueue, nodeShardCounterSerialization );
+
+        Schedulers.newThread().createWorker().schedule( worker );
+
     }
 
 
     @Override
-    public void increment( final ApplicationScope scope, final Id nodeId, final long shardId, final long count,
-                           final String... edgeType ) {
+    public void increment(
+            final ApplicationScope scope, final Shard shard,
+            final long count, final DirectedEdgeMeta directedEdgeMeta  ) {
 
 
-        final ShardKey key = new ShardKey( scope, nodeId, shardId, edgeType );
+        final ShardKey key = new ShardKey( scope, shard, directedEdgeMeta );
 
         readLock.lock();
 
@@ -102,10 +121,9 @@ public class NodeShardApproximationImpl implements NodeShardApproximation {
 
 
     @Override
-    public long getCount( final ApplicationScope scope, final Id nodeId, final long shardId,
-                          final String... edgeType ) {
+    public long getCount( final ApplicationScope scope, final Shard shard, final DirectedEdgeMeta directedEdgeMeta ) {
 
-        final ShardKey key = new ShardKey( scope, nodeId, shardId, edgeType );
+        final ShardKey key = new ShardKey( scope, shard, directedEdgeMeta );
 
 
         readLock.lock();
@@ -115,9 +133,6 @@ public class NodeShardApproximationImpl implements NodeShardApproximation {
         try {
             count = currentCounter.get( key );
 
-            if ( flushPending != null ) {
-                count += flushPending.get( key );
-            }
         }
         finally {
             readLock.unlock();
@@ -130,78 +145,121 @@ public class NodeShardApproximationImpl implements NodeShardApproximation {
 
 
     @Override
-    public void flush() {
+    public void beginFlush() {
 
         writeLockLock.lock();
 
         try {
-            flushPending = currentCounter;
-            currentCounter = new Counter();
+
+            final boolean queued = flushQueue.offer( currentCounter );
+
+            /**
+             * We were able to q the beginFlush, swap it
+             */
+            if ( queued ) {
+                currentCounter = new Counter();
+            }
         }
         finally {
             writeLockLock.unlock();
         }
+    }
 
 
-        //copy to the batch outside of the command for performance
-        final MutationBatch batch = nodeShardCounterSerialization.flush( flushPending );
+    @Override
+    public boolean flushPending() {
+        return flushQueue.size() > 0 || worker.isFlushing();
+    }
 
-        /**
-         * Execute the command in hystrix to avoid slamming cassandra
-         */
-        new HystrixCommand( HystrixCassandra.ASYNC_GROUP ) {
 
-            @Override
-            protected Void run() throws Exception {
-                /**
-                 * Execute the batch asynchronously
-                 */
-                batch.execute();
+    /**
+     * Check if we need to beginFlush.  If we do, perform the beginFlush
+     */
+    private void checkFlush() {
 
-                return null;
-            }
+        //there's no beginFlush pending and we're past the timeout or count
+        if ( currentCounter.getCreateTimestamp() + graphFig.getCounterFlushInterval() > timeService.getCurrentTime()
+                || currentCounter.getInvokeCount() >= graphFig.getCounterFlushCount() ) {
+            beginFlush();
+        }
+    }
 
 
-            @Override
-            protected Object getFallback() {
-                //we've failed to mutate.  Merge this count back into the current one
-                currentCounter.merge( flushPending );
+    /**
+     * Worker that will take from the queue
+     */
+    private static class FlushWorker implements Action0 {
 
-                return null;
-            }
-        }.execute();
+        private final BlockingQueue<Counter> counterQueue;
+        private final NodeShardCounterSerialization nodeShardCounterSerialization;
 
-        writeLockLock.lock();
+        private volatile Counter rollUp;
 
-        try {
-            flushPending = null;
-        }
-        finally {
-            writeLockLock.unlock();
+
+        private FlushWorker( final BlockingQueue<Counter> counterQueue,
+                             final NodeShardCounterSerialization nodeShardCounterSerialization ) {
+            this.counterQueue = counterQueue;
+            this.nodeShardCounterSerialization = nodeShardCounterSerialization;
         }
-    }
 
 
-    /**
-     * Check if we need to flush.  If we do, perform the flush
-     */
-    private void checkFlush() {
+        @Override
+        public void call() {
 
-        //there's no flush pending and we're past the timeout or count
-        if ( flushPending == null && (
-                currentCounter.getCreateTimestamp() + graphFig.getCounterFlushInterval() > timeService.getCurrentTime()
-                        || currentCounter.getInvokeCount() >= graphFig.getCounterFlushCount() ) ) {
 
+            while ( true ) {
+                /**
+                 * Block taking the first element.  Once we take this, batch drain and roll up the rest
+                 */
+
+                try {
+                    rollUp = null;
+                    rollUp = counterQueue.take();
+                }
+                catch ( InterruptedException e ) {
+                    LOG.error( "Unable to read from counter queue", e );
+                    throw new RuntimeException( "Unable to read from counter queue", e );
 
-            /**
-             * Fire the flush action asynchronously
-             */
-            Schedulers.immediate().createWorker().schedule( new Action0() {
-                @Override
-                public void call() {
-                    flush();
                 }
-            } );
+
+
+
+
+                //copy to the batch outside of the command for performance
+                final MutationBatch batch = nodeShardCounterSerialization.flush( rollUp );
+
+                /**
+                 * Execute the command in hystrix to avoid slamming cassandra
+                 */
+                new HystrixCommand( HystrixCassandra.ASYNC_GROUP ) {
+
+                    @Override
+                    protected Void run() throws Exception {
+                        batch.execute();
+
+                        return null;
+                    }
+
+
+                    @Override
+                    protected Object getFallback() {
+                        //we've failed to mutate.  Merge this count back into the current one
+                        counterQueue.offer( rollUp );
+
+                        return null;
+                    }
+                }.execute();
+            }
+
+        }
+
+
+        /**
+         * Return true if we're in the process of flushing
+         * @return
+         */
+        public boolean isFlushing(){
+            return rollUp != null;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9d652dd/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerialization.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerialization.java
index 41eb525..4b05401 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerialization.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerialization.java
@@ -27,7 +27,7 @@ import com.netflix.astyanax.MutationBatch;
 /**
  * Serialization for flushing and reading counters
  */
-public interface NodeShardCounterSerialization  extends Migration {
+public interface NodeShardCounterSerialization extends Migration {
 
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9d652dd/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java
index da318bf..6b99e93 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java
@@ -29,15 +29,20 @@ import org.apache.cassandra.db.marshal.CounterColumnType;
 
 import org.apache.usergrid.persistence.core.astyanax.CassandraConfig;
 import org.apache.usergrid.persistence.core.astyanax.ColumnTypes;
+import org.apache.usergrid.persistence.core.astyanax.CompositeFieldSerializer;
+import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer;
 import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
 import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
 import org.apache.usergrid.persistence.core.astyanax.OrganizationScopedRowKeySerializer;
 import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
 import org.apache.usergrid.persistence.graph.GraphFig;
 import org.apache.usergrid.persistence.graph.exception.GraphRuntimeException;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.EdgeRowKey;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.EdgeRowKeySerializer;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.EdgeShardRowKeySerializer;
+import org.apache.usergrid.persistence.model.entity.Id;
 
 import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
@@ -48,20 +53,23 @@ import com.netflix.astyanax.connectionpool.OperationResult;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 import com.netflix.astyanax.connectionpool.exceptions.NotFoundException;
 import com.netflix.astyanax.model.Column;
-import com.netflix.astyanax.serializers.LongSerializer;
-
+import com.netflix.astyanax.model.CompositeBuilder;
+import com.netflix.astyanax.model.CompositeParser;
+import com.netflix.astyanax.serializers.BooleanSerializer;
 
 
 @Singleton
 public class NodeShardCounterSerializationImpl implements NodeShardCounterSerialization {
 
 
+    private static final ShardKeySerializer SHARD_KEY_SERIALIZER = new ShardKeySerializer();
+
     /**
      * Edge shards
      */
-    private static final MultiTennantColumnFamily<ApplicationScope, EdgeRowKey, Long> EDGE_SHARD_COUNTS =
+    private static final MultiTennantColumnFamily<ApplicationScope, ShardKey, Boolean> EDGE_SHARD_COUNTS =
             new MultiTennantColumnFamily<>( "Edge_Shard_Counts",
-                    new OrganizationScopedRowKeySerializer<>( new EdgeRowKeySerializer() ), LongSerializer.get() );
+                    new OrganizationScopedRowKeySerializer<>( SHARD_KEY_SERIALIZER ), BooleanSerializer.get() );
 
 
     protected final Keyspace keyspace;
@@ -92,12 +100,11 @@ public class NodeShardCounterSerializationImpl implements NodeShardCounterSerial
             final ShardKey key = entry.getKey();
             final long value = entry.getValue().get();
 
-            final EdgeRowKey edgeRowKey = new EdgeRowKey( key.getNodeId(), key.getEdgeTypes() );
 
-            final ScopedRowKey rowKey = ScopedRowKey.fromKey( key.getScope(), edgeRowKey );
+            final ScopedRowKey rowKey = ScopedRowKey.fromKey( key.scope, key );
 
 
-            batch.withRow( EDGE_SHARD_COUNTS, rowKey ).incrementCounterColumn( key.getShardId(), value );
+            batch.withRow( EDGE_SHARD_COUNTS, rowKey ).incrementCounterColumn(true , value );
         }
 
 
@@ -108,14 +115,12 @@ public class NodeShardCounterSerializationImpl implements NodeShardCounterSerial
     @Override
     public long getCount( final ShardKey key ) {
 
-        final EdgeRowKey edgeRowKey = new EdgeRowKey( key.getNodeId(), key.getEdgeTypes() );
-
-        final ScopedRowKey rowKey = ScopedRowKey.fromKey( key.getScope(), edgeRowKey );
+        final ScopedRowKey rowKey = ScopedRowKey.fromKey( key.scope, key );
 
 
         try {
-            OperationResult<Column<Long>> column =
-                    keyspace.prepareQuery( EDGE_SHARD_COUNTS ).getKey( rowKey ).getColumn( key.getShardId() ).execute();
+            OperationResult<Column<Boolean>> column =
+                    keyspace.prepareQuery( EDGE_SHARD_COUNTS ).getKey( rowKey ).getColumn( true ).execute();
 
             return column.getResult().getLongValue();
         }
@@ -133,7 +138,50 @@ public class NodeShardCounterSerializationImpl implements NodeShardCounterSerial
     public Collection<MultiTennantColumnFamilyDefinition> getColumnFamilies() {
         return Collections.singleton(
                 new MultiTennantColumnFamilyDefinition( EDGE_SHARD_COUNTS, BytesType.class.getSimpleName(),
-                        ColumnTypes.LONG_TYPE_REVERSED, CounterColumnType.class.getSimpleName(),
-                        MultiTennantColumnFamilyDefinition.CacheOption.KEYS ) );
+                        ColumnTypes.BOOLEAN, CounterColumnType.class.getSimpleName(),
+                        MultiTennantColumnFamilyDefinition.CacheOption.ALL ) );
     }
+
+
+
+    private static class ShardKeySerializer implements CompositeFieldSerializer<ShardKey> {
+
+
+        private static final IdRowCompositeSerializer ID_SER = IdRowCompositeSerializer.get();
+
+        private static final EdgeShardRowKeySerializer EDGE_SHARD_ROW_KEY_SERIALIZER = EdgeShardRowKeySerializer.INSTANCE;
+
+
+        @Override
+        public void toComposite( final CompositeBuilder builder, final ShardKey key ) {
+
+            ID_SER.toComposite( builder, key.scope.getApplication() );
+
+            EDGE_SHARD_ROW_KEY_SERIALIZER.toComposite( builder, key.directedEdgeMeta );
+
+            builder.addLong( key.shard.getShardIndex() );
+
+            builder.addLong( key.shard.getCreatedTime() );
+        }
+
+
+        @Override
+        public ShardKey fromComposite( final CompositeParser composite ) {
+
+            final Id applicationId = ID_SER.fromComposite( composite );
+
+            final ApplicationScope scope = new ApplicationScopeImpl( applicationId );
+
+            final DirectedEdgeMeta directedEdgeMeta = EDGE_SHARD_ROW_KEY_SERIALIZER.fromComposite( composite );
+
+            final long shardIndex = composite.readLong();
+
+            final long shardCreatedTime = composite.readLong();
+
+            return new ShardKey( scope, new Shard( shardIndex, shardCreatedTime, false ), directedEdgeMeta );
+        }
+
+
+    }
+
 }