You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2014/08/05 00:14:57 UTC

[04/11] git commit: Checkpoint

Checkpoint


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/7425ba5d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/7425ba5d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/7425ba5d

Branch: refs/heads/USERGRID-188
Commit: 7425ba5df6b79c40c7632824b42e1444b5c91780
Parents: 72f0d7d
Author: Todd Nine <tn...@apigee.com>
Authored: Mon Jul 14 15:01:37 2014 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon Jul 14 15:01:37 2014 -0600

----------------------------------------------------------------------
 .../usergrid/persistence/graph/GraphFig.java    |  10 +-
 .../impl/EdgeSerializationImpl.java             |  37 ++--
 .../impl/shard/NodeShardApproximation.java      |  11 +-
 .../serialization/impl/shard/count/Counter.java |   8 +-
 .../shard/count/NodeShardApproximationImpl.java | 169 ++++++++++++-------
 .../impl/shard/count/ShardKey.java              |  56 +++---
 .../shard/impl/EdgeShardSerializationImpl.java  |   2 +-
 .../graph/serialization/util/EdgeHasher.java    |  86 ++++++++++
 .../graph/GraphManagerShardingIT.java           |  13 +-
 .../impl/shard/EdgeShardSerializationTest.java  |  39 +++--
 .../impl/shard/NodeShardAllocationTest.java     |  36 ++--
 .../shard/count/NodeShardApproximationTest.java |  79 +++++----
 .../NodeShardCounterSerializationTest.java      |   7 +-
 13 files changed, 382 insertions(+), 171 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7425ba5d/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
index e0ce45c..f84fef4 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
@@ -41,9 +41,11 @@ public interface GraphFig extends GuicyFig {
 
     public static final String SHARD_CACHE_TIMEOUT = "usergrid.graph.shard.cache.timeout";
 
-    public static final String COUNTER_WRITE_FLUSH_COUNT = "usergrid.graph.shard.counter.flush.count";
+    public static final String COUNTER_WRITE_FLUSH_COUNT = "usergrid.graph.shard.counter.beginFlush.count";
 
-    public static final String COUNTER_WRITE_FLUSH_INTERVAL = "usergrid.graph.shard.counter.flush.interval";
+    public static final String COUNTER_WRITE_FLUSH_INTERVAL = "usergrid.graph.shard.counter.beginFlush.interval";
+
+    public static final String COUNTER_WRITE_FLUSH_QUEUE_SIZE = "usergrid.graph.shard.counter.queue.size";
 
 
 
@@ -82,5 +84,9 @@ public interface GraphFig extends GuicyFig {
     @Default( "30000" )
     @Key( COUNTER_WRITE_FLUSH_INTERVAL )
     long getCounterFlushInterval();
+
+    @Default( "1000" )
+    @Key(COUNTER_WRITE_FLUSH_QUEUE_SIZE  )
+    int getCounterFlushQueueSize();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7425ba5d/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
index 7c630c1..e49b99d 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
@@ -55,6 +55,7 @@ import org.apache.usergrid.persistence.graph.SearchByIdType;
 import org.apache.usergrid.persistence.graph.impl.SimpleMarkedEdge;
 import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardStrategy;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeType;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntries;
 import org.apache.usergrid.persistence.graph.serialization.util.EdgeHasher;
@@ -211,9 +212,9 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
 
 
             @Override
-            public void countEdge( final Id rowId, final long shardId, final String... types ) {
+            public void countEdge( final Id rowId, final NodeType nodeType, final long shardId, final String... types ) {
                 if ( !isDeleted ) {
-                    edgeShardStrategy.increment( scope, rowId, shardId, 1l, types );
+                    edgeShardStrategy.increment( scope, rowId, nodeType, shardId, 1l, types );
                 }
             }
 
@@ -250,8 +251,8 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
 
 
             @Override
-            public void countEdge( final Id rowId, final long shardId, final String... types ) {
-                edgeShardStrategy.increment( scope, rowId, shardId, -1, types );
+            public void countEdge( final Id rowId, final NodeType nodeType,  final long shardId,  final String... types ) {
+                edgeShardStrategy.increment( scope, rowId, nodeType, shardId, -1, types );
             }
 
 
@@ -298,7 +299,7 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
 
         final DirectedEdge sourceEdge = new DirectedEdge( targetNodeId, timestamp );
 
-        final ShardEntries sourceRowKeyShard = edgeShardStrategy.getWriteShards( scope, sourceNodeId, timestamp, type );
+        final ShardEntries sourceRowKeyShard = edgeShardStrategy.getWriteShards( scope, sourceNodeId, NodeType.SOURCE, timestamp, type );
 
 
         for ( Shard shard : sourceRowKeyShard.getEntries() ) {
@@ -306,12 +307,12 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
             final long shardId = shard.getShardIndex();
             final RowKey sourceRowKey = new RowKey( sourceNodeId, type, shardId );
             op.writeEdge( sourceNodeEdgesCf, sourceRowKey, sourceEdge );
-            op.countEdge( sourceNodeId, shardId, type );
+            op.countEdge( sourceNodeId, NodeType.SOURCE, shardId, type );
         }
 
 
         final ShardEntries sourceWithTypeRowKeyShard =
-                edgeShardStrategy.getWriteShards( scope, sourceNodeId, timestamp, type, targetNodeType );
+                edgeShardStrategy.getWriteShards( scope, sourceNodeId, NodeType.SOURCE, timestamp, type, targetNodeType );
 
         for ( Shard shard : sourceWithTypeRowKeyShard.getEntries() ) {
 
@@ -319,7 +320,7 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
             final RowKeyType sourceRowKeyType = new RowKeyType( sourceNodeId, type, targetNodeId, shardId );
 
             op.writeEdge( sourceNodeTargetTypeCf, sourceRowKeyType, sourceEdge );
-            op.countEdge( sourceNodeId, shardId, type, targetNodeType );
+            op.countEdge( sourceNodeId, NodeType.SOURCE, shardId, type, targetNodeType );
         }
 
 
@@ -330,19 +331,19 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
         final DirectedEdge targetEdge = new DirectedEdge( sourceNodeId, timestamp );
 
 
-        final ShardEntries targetRowKeyShard = edgeShardStrategy.getWriteShards( scope, targetNodeId, timestamp, type );
+        final ShardEntries targetRowKeyShard = edgeShardStrategy.getWriteShards( scope, targetNodeId, NodeType.TARGET, timestamp, type );
 
         for ( Shard shard : targetRowKeyShard.getEntries() ) {
             final long shardId = shard.getShardIndex();
             final RowKey targetRowKey = new RowKey( targetNodeId, type, shardId );
 
             op.writeEdge( targetNodeEdgesCf, targetRowKey, targetEdge );
-            op.countEdge( targetNodeId, shardId, type );
+            op.countEdge( targetNodeId, NodeType.TARGET, shardId, type );
         }
 
 
         final ShardEntries targetWithTypeRowKeyShard =
-                edgeShardStrategy.getWriteShards( scope, targetNodeId, timestamp, type, souceNodeType );
+                edgeShardStrategy.getWriteShards( scope, targetNodeId, NodeType.TARGET, timestamp, type, souceNodeType );
 
 
         for ( Shard shard : targetWithTypeRowKeyShard.getEntries() ) {
@@ -353,7 +354,7 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
 
 
             op.writeEdge( targetNodeSourceTypeCf, targetRowKeyType, targetEdge );
-            op.countEdge( targetNodeId, shardId, type, souceNodeType );
+            op.countEdge( targetNodeId, NodeType.TARGET, shardId, type, souceNodeType );
         }
 
         /**
@@ -381,7 +382,7 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
 
         final EdgeSearcher<EdgeRowKey, Long, MarkedEdge> searcher =
                 new EdgeSearcher<EdgeRowKey, Long, MarkedEdge>( scope, maxTimestamp, search.last(),
-                        edgeShardStrategy.getReadShards( scope, sourceId, maxTimestamp, type ) ) {
+                        edgeShardStrategy.getReadShards( scope, sourceId, NodeType.SOURCE, maxTimestamp, type ) ) {
 
                     @Override
                     protected Serializer<Long> getSerializer() {
@@ -437,7 +438,7 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
 
         final EdgeSearcher<RowKey, DirectedEdge, MarkedEdge> searcher =
                 new EdgeSearcher<RowKey, DirectedEdge, MarkedEdge>( scope, maxTimestamp, edgeType.last(),
-                        edgeShardStrategy.getReadShards( scope, sourceId, maxTimestamp, type ) ) {
+                        edgeShardStrategy.getReadShards( scope, sourceId,NodeType.SOURCE, maxTimestamp, type ) ) {
 
 
                     @Override
@@ -482,7 +483,7 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
 
         final EdgeSearcher<RowKeyType, DirectedEdge, MarkedEdge> searcher =
                 new EdgeSearcher<RowKeyType, DirectedEdge, MarkedEdge>( scope, maxTimestamp, edgeType.last(),
-                        edgeShardStrategy.getReadShards( scope, targetId, maxTimestamp, type, targetType ) ) {
+                        edgeShardStrategy.getReadShards( scope, targetId, NodeType.TARGET, maxTimestamp, type, targetType ) ) {
 
                     @Override
                     protected Serializer<DirectedEdge> getSerializer() {
@@ -524,7 +525,7 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
 
         final EdgeSearcher<RowKey, DirectedEdge, MarkedEdge> searcher =
                 new EdgeSearcher<RowKey, DirectedEdge, MarkedEdge>( scope, maxTimestamp, edgeType.last(),
-                        edgeShardStrategy.getReadShards( scope, targetId, maxTimestamp, type ) ) {
+                        edgeShardStrategy.getReadShards( scope, targetId, NodeType.TARGET, maxTimestamp, type ) ) {
 
                     @Override
                     protected Serializer<DirectedEdge> getSerializer() {
@@ -570,7 +571,7 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
 
         final EdgeSearcher<RowKeyType, DirectedEdge, MarkedEdge> searcher =
                 new EdgeSearcher<RowKeyType, DirectedEdge, MarkedEdge>( scope, maxTimestamp, edgeType.last(),
-                        edgeShardStrategy.getReadShards( scope, targetId, maxTimestamp, type, sourceType ) ) {
+                        edgeShardStrategy.getReadShards( scope, targetId, NodeType.TARGET,  maxTimestamp, type, sourceType ) ) {
                     @Override
                     protected Serializer<DirectedEdge> getSerializer() {
                         return EDGE_SERIALIZER;
@@ -994,7 +995,7 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
         /**
          * Perform the count on the edge
          */
-        void countEdge( final Id rowId, long shardId, String... types );
+        void countEdge( final Id rowId, NodeType type, long shardId,  String... types );
 
         /**
          * Write the edge into the version cf

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7425ba5d/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardApproximation.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardApproximation.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardApproximation.java
index 311e605..f2f51ef 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardApproximation.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardApproximation.java
@@ -55,7 +55,14 @@ public interface NodeShardApproximation {
 
 
     /**
-     * Flush the current counters in the Approximation
+     * Flush the current counters in the Approximation.  Will return immediately after the flush. You can then use flushPending
+     * to check the state.
      */
-    public void flush();
+    public void beginFlush();
+
+    /**
+     * Return true if there is data to be flushed
+     * @return
+     */
+    public boolean flushPending();
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7425ba5d/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/Counter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/Counter.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/Counter.java
index 4318200..f5666a2 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/Counter.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/Counter.java
@@ -24,6 +24,8 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 
+import com.google.common.base.Preconditions;
+
 
 /**
  * This class is synchronized for addition.  It is meant to be used across multiple threads
@@ -35,7 +37,7 @@ public class Counter {
     private final AtomicLong invokeCounter;
 
     /**
-     * Pointer to our "current" counter map.  We flush this when time expires or we hit our count
+     * Pointer to our "current" counter map.  We beginFlush this when time expires or we hit our count
      */
     private final ConcurrentHashMap<ShardKey, AtomicLong> counts;
 
@@ -94,6 +96,10 @@ public class Counter {
      * @param other
      */
     public void merge(final Counter other){
+
+        Preconditions.checkNotNull(other, "other cannot be null");
+        Preconditions.checkNotNull( other.counts, "other.counts cannot be null" );
+
         for(Map.Entry<ShardKey, AtomicLong> entry: other.counts.entrySet()){
             add(entry.getKey(), entry.getValue().get());
         }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7425ba5d/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java
index c7197b5..e0740fa 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java
@@ -20,10 +20,15 @@
 package org.apache.usergrid.persistence.graph.serialization.impl.shard.count;
 
 
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import javax.inject.Inject;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.usergrid.persistence.core.consistency.TimeService;
 import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
@@ -41,10 +46,12 @@ import rx.schedulers.Schedulers;
 
 /**
  * Implementation for doing edge approximation based on counters.  Uses a guava loading cache to load values from
- * cassandra, and flush them on cache eviction.
+ * cassandra, and beginFlush them on cache eviction.
  */
 public class NodeShardApproximationImpl implements NodeShardApproximation {
 
+    private static final Logger LOG = LoggerFactory.getLogger(NodeShardApproximationImpl.class);
+
     /**
      * Read write locks to ensure we atomically swap correctly
      */
@@ -64,7 +71,9 @@ public class NodeShardApproximationImpl implements NodeShardApproximation {
     /**
      * The counter that is currently in process of flushing to Cassandra.  Can be null
      */
-    private volatile Counter flushPending;
+    private final BlockingQueue<Counter> flushQueue;
+
+    private final FlushWorker worker;
 
 
     /**
@@ -78,15 +87,21 @@ public class NodeShardApproximationImpl implements NodeShardApproximation {
         this.nodeShardCounterSerialization = nodeShardCounterSerialization;
         this.timeService = timeService;
         this.currentCounter = new Counter();
+        this.flushQueue = new LinkedBlockingQueue<>( graphFig.getCounterFlushQueueSize() );
+
+        this.worker = new FlushWorker( this.flushQueue, nodeShardCounterSerialization );
+
+        Schedulers.newThread().createWorker().schedule( worker );
+
     }
 
 
     @Override
-    public void increment( final ApplicationScope scope, final Id nodeId,  final NodeType nodeType, final long shardId, final long count,
-                           final String... edgeType ) {
+    public void increment( final ApplicationScope scope, final Id nodeId, final NodeType nodeType, final long shardId,
+                           final long count, final String... edgeType ) {
 
 
-        final ShardKey key = new ShardKey( scope, nodeId, shardId, edgeType );
+        final ShardKey key = new ShardKey( scope, nodeId, nodeType, shardId, edgeType );
 
         readLock.lock();
 
@@ -103,10 +118,10 @@ public class NodeShardApproximationImpl implements NodeShardApproximation {
 
 
     @Override
-    public long getCount( final ApplicationScope scope, final Id nodeId,  final NodeType nodeType, final long shardId,
+    public long getCount( final ApplicationScope scope, final Id nodeId, final NodeType nodeType, final long shardId,
                           final String... edgeType ) {
 
-        final ShardKey key = new ShardKey( scope, nodeId, shardId, edgeType );
+        final ShardKey key = new ShardKey( scope, nodeId, nodeType, shardId, edgeType );
 
 
         readLock.lock();
@@ -116,9 +131,6 @@ public class NodeShardApproximationImpl implements NodeShardApproximation {
         try {
             count = currentCounter.get( key );
 
-            if ( flushPending != null ) {
-                count += flushPending.get( key );
-            }
         }
         finally {
             readLock.unlock();
@@ -131,78 +143,121 @@ public class NodeShardApproximationImpl implements NodeShardApproximation {
 
 
     @Override
-    public void flush() {
+    public void beginFlush() {
 
         writeLockLock.lock();
 
         try {
-            flushPending = currentCounter;
-            currentCounter = new Counter();
+
+            final boolean queued = flushQueue.offer( currentCounter );
+
+            /**
+             * We were able to q the beginFlush, swap it
+             */
+            if ( queued ) {
+                currentCounter = new Counter();
+            }
         }
         finally {
             writeLockLock.unlock();
         }
+    }
 
 
-        //copy to the batch outside of the command for performance
-        final MutationBatch batch = nodeShardCounterSerialization.flush( flushPending );
+    @Override
+    public boolean flushPending() {
+        return flushQueue.size() > 0 || worker.isFlushing();
+    }
 
-        /**
-         * Execute the command in hystrix to avoid slamming cassandra
-         */
-        new HystrixCommand( HystrixCassandra.ASYNC_GROUP ) {
 
-            @Override
-            protected Void run() throws Exception {
-                /**
-                 * Execute the batch asynchronously
-                 */
-                batch.execute();
+    /**
+     * Check if we need to beginFlush.  If we do, perform the beginFlush
+     */
+    private void checkFlush() {
 
-                return null;
-            }
+        //there's no beginFlush pending and we're past the timeout or count
+        if ( currentCounter.getCreateTimestamp() + graphFig.getCounterFlushInterval() > timeService.getCurrentTime()
+                || currentCounter.getInvokeCount() >= graphFig.getCounterFlushCount() ) {
+            beginFlush();
+        }
+    }
 
 
-            @Override
-            protected Object getFallback() {
-                //we've failed to mutate.  Merge this count back into the current one
-                currentCounter.merge( flushPending );
+    /**
+     * Worker that will take from the queue
+     */
+    private static class FlushWorker implements Action0 {
 
-                return null;
-            }
-        }.execute();
+        private final BlockingQueue<Counter> counterQueue;
+        private final NodeShardCounterSerialization nodeShardCounterSerialization;
 
-        writeLockLock.lock();
+        private volatile Counter rollUp;
 
-        try {
-            flushPending = null;
-        }
-        finally {
-            writeLockLock.unlock();
+
+        private FlushWorker( final BlockingQueue<Counter> counterQueue,
+                             final NodeShardCounterSerialization nodeShardCounterSerialization ) {
+            this.counterQueue = counterQueue;
+            this.nodeShardCounterSerialization = nodeShardCounterSerialization;
         }
-    }
 
 
-    /**
-     * Check if we need to flush.  If we do, perform the flush
-     */
-    private void checkFlush() {
+        @Override
+        public void call() {
 
-        //there's no flush pending and we're past the timeout or count
-        if ( flushPending == null && (
-                currentCounter.getCreateTimestamp() + graphFig.getCounterFlushInterval() > timeService.getCurrentTime()
-                        || currentCounter.getInvokeCount() >= graphFig.getCounterFlushCount() ) ) {
 
+            while ( true ) {
+                /**
+                 * Block taking the first element.  Once we take this, batch drain and roll up the rest
+                 */
+
+                try {
+                    rollUp = null;
+                    rollUp = counterQueue.take();
+                }
+                catch ( InterruptedException e ) {
+                    LOG.error( "Unable to read from counter queue", e );
+                    throw new RuntimeException( "Unable to read from counter queue", e );
 
-            /**
-             * Fire the flush action asynchronously
-             */
-            Schedulers.immediate().createWorker().schedule( new Action0() {
-                @Override
-                public void call() {
-                    flush();
                 }
-            } );
+
+
+
+
+                //copy to the batch outside of the command for performance
+                final MutationBatch batch = nodeShardCounterSerialization.flush( rollUp );
+
+                /**
+                 * Execute the command in hystrix to avoid slamming cassandra
+                 */
+                new HystrixCommand( HystrixCassandra.ASYNC_GROUP ) {
+
+                    @Override
+                    protected Void run() throws Exception {
+                        batch.execute();
+
+                        return null;
+                    }
+
+
+                    @Override
+                    protected Object getFallback() {
+                        //we've failed to mutate.  Merge this count back into the current one
+                        counterQueue.offer( rollUp );
+
+                        return null;
+                    }
+                }.execute();
+            }
+
+        }
+
+
+        /**
+         * Return true if we're in the process of flushing
+         * @return
+         */
+        public boolean isFlushing(){
+            return rollUp != null;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7425ba5d/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/ShardKey.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/ShardKey.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/ShardKey.java
index 63c87d3..55a761b 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/ShardKey.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/ShardKey.java
@@ -22,6 +22,7 @@ package org.apache.usergrid.persistence.graph.serialization.impl.shard.count;
 import java.util.Arrays;
 
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeType;
 import org.apache.usergrid.persistence.model.entity.Id;
 
 
@@ -32,14 +33,43 @@ public class ShardKey {
     private final ApplicationScope scope;
     private final Id nodeId;
     private final long shardId;
+    private final NodeType nodeType;
     private final String[] edgeTypes;
 
 
-    public ShardKey( final ApplicationScope scope, final Id nodeId, final long shardId, final String... edgeTypes ) {
+    public ShardKey( final ApplicationScope scope, final Id nodeId, final NodeType nodeType, final long shardId, final String... edgeTypes ) {
         this.scope = scope;
         this.nodeId = nodeId;
         this.shardId = shardId;
         this.edgeTypes = edgeTypes;
+        this.nodeType = nodeType;
+    }
+
+
+
+
+    public ApplicationScope getScope() {
+        return scope;
+    }
+
+
+    public Id getNodeId() {
+        return nodeId;
+    }
+
+
+    public long getShardId() {
+        return shardId;
+    }
+
+
+    public String[] getEdgeTypes() {
+        return edgeTypes;
+    }
+
+
+    public NodeType getNodeType() {
+        return nodeType;
     }
 
 
@@ -63,6 +93,9 @@ public class ShardKey {
         if ( !nodeId.equals( shardKey.nodeId ) ) {
             return false;
         }
+        if ( nodeType != shardKey.nodeType ) {
+            return false;
+        }
         if ( !scope.equals( shardKey.scope ) ) {
             return false;
         }
@@ -71,31 +104,12 @@ public class ShardKey {
     }
 
 
-    public ApplicationScope getScope() {
-        return scope;
-    }
-
-
-    public Id getNodeId() {
-        return nodeId;
-    }
-
-
-    public long getShardId() {
-        return shardId;
-    }
-
-
-    public String[] getEdgeTypes() {
-        return edgeTypes;
-    }
-
-
     @Override
     public int hashCode() {
         int result = scope.hashCode();
         result = 31 * result + nodeId.hashCode();
         result = 31 * result + ( int ) ( shardId ^ ( shardId >>> 32 ) );
+        result = 31 * result + nodeType.hashCode();
         result = 31 * result + Arrays.hashCode( edgeTypes );
         return result;
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7425ba5d/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
index 1e6614a..5b7c901 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
@@ -104,7 +104,7 @@ public class EdgeShardSerializationImpl implements EdgeShardSerialization {
 
         final MutationBatch batch = keyspace.prepareMutationBatch();
 
-        batch.withRow( EDGE_SHARDS, rowKey ).putColumn( shard, HOLDER ).setTimestamp( timestamp );
+        batch.withTimestamp( timestamp ).withRow( EDGE_SHARDS, rowKey ).putColumn( shard, HOLDER );
 
         return batch;
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7425ba5d/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/util/EdgeHasher.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/util/EdgeHasher.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/util/EdgeHasher.java
new file mode 100644
index 0000000..14a67fa
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/util/EdgeHasher.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.util;
+
+
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+
+import org.apache.cassandra.utils.MurmurHash;
+
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+/**
+ *
+ *
+ */
+public class EdgeHasher {
+
+    private static final String UTF_8 = "UTF-8";
+    private static final Charset CHARSET = Charset.forName( UTF_8 );
+
+
+    /**
+     * Create a hash based on the edge type and the type of the id that will be inserted into the column
+     *
+     *
+     * @param edgeType The name of the edge type
+     * @param idForColumn The id of the value that will be in the column
+     *
+     * @return A hash that represents a consistent one way hash of the fields
+     */
+    public static long[] createEdgeHash( final String edgeType, final Id idForColumn ) {
+
+        return createEdgeHash( edgeType, idForColumn.getType() );
+    }
+
+
+    /**
+     * Create the edge hash from the edge type and id type
+     * @param edgeTypes
+     * @return
+     */
+    public static long[] createEdgeHash(final String... edgeTypes){
+       final StringBuilder hashString =  new StringBuilder();
+
+        for(String edge: edgeTypes){
+            hashString.append(edge);
+        }
+
+        return createEdgeHash( hashString.toString() );
+    }
+
+
+    /**
+     * Create a ash based on the edge type and the type of the id that will be inserted into the column
+     *
+     * @return A hash that represents a consistent one way hash of the fields
+     */
+    public static long[] createEdgeHash( final String edgeType ) {
+
+
+        ByteBuffer key = ByteBuffer.wrap( edgeType.getBytes( CHARSET ) );
+
+        return  MurmurHash.hash3_x64_128( key, key.position(), key.remaining(), 0 );
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7425ba5d/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardingIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardingIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardingIT.java
index 01e07b1..adcb42e 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardingIT.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardingIT.java
@@ -35,6 +35,7 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
 import org.apache.usergrid.persistence.graph.guice.TestGraphModule;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardApproximation;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeType;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 
@@ -101,7 +102,7 @@ public class GraphManagerShardingIT {
         //each edge causes 4 counts
         final long writeCount = flushCount/4;
 
-        assertTrue( "Shard size must be >= flush Count", maxShardSize >= flushCount );
+        assertTrue( "Shard size must be >= beginFlush Count", maxShardSize >= flushCount );
 
         Id targetId = null;
 
@@ -115,14 +116,14 @@ public class GraphManagerShardingIT {
         }
 
 
-        long shardCount = nodeShardApproximation.getCount( scope, sourceId, 0l, edgeType );
+        long shardCount = nodeShardApproximation.getCount( scope, sourceId, NodeType.SOURCE,  0l, edgeType );
 
         assertEquals("Shard count for source node should be the same as write count", writeCount, shardCount);
 
 
         //now verify it's correct for the target
 
-        shardCount = nodeShardApproximation.getCount( scope, targetId, 0l, edgeType );
+        shardCount = nodeShardApproximation.getCount( scope, targetId, NodeType.TARGET,  0l, edgeType );
 
         assertEquals(1, shardCount);
 
@@ -151,7 +152,7 @@ public class GraphManagerShardingIT {
         //each edge causes 4 counts
         final long writeCount = flushCount/4;
 
-        assertTrue( "Shard size must be >= flush Count", maxShardSize >= flushCount );
+        assertTrue( "Shard size must be >= beginFlush Count", maxShardSize >= flushCount );
 
         Id sourceId = null;
 
@@ -165,14 +166,14 @@ public class GraphManagerShardingIT {
         }
 
 
-        long shardCount = nodeShardApproximation.getCount( scope, targetId, 0l, edgeType );
+        long shardCount = nodeShardApproximation.getCount( scope, targetId, NodeType.TARGET,  0l, edgeType );
 
         assertEquals("Shard count for source node should be the same as write count", writeCount, shardCount);
 
 
         //now verify it's correct for the target
 
-        shardCount = nodeShardApproximation.getCount( scope, sourceId, 0l, edgeType );
+        shardCount = nodeShardApproximation.getCount( scope, sourceId, NodeType.SOURCE,  0l, edgeType );
 
         assertEquals(1, shardCount);
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7425ba5d/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerializationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerializationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerializationTest.java
index 6135a2d..937a51a 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerializationTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerializationTest.java
@@ -96,16 +96,16 @@ public class EdgeShardSerializationTest {
 
         String[] types = { "edgeType", "subType" };
 
-        MutationBatch batch = edgeShardSerialization.writeEdgeMeta( scope, now, slice1, timestamp, types );
+        MutationBatch batch = edgeShardSerialization.writeEdgeMeta( scope, now, NodeType.SOURCE, slice1, timestamp, types );
 
-        batch.mergeShallow( edgeShardSerialization.writeEdgeMeta( scope, now, slice2, timestamp, types ) );
+        batch.mergeShallow( edgeShardSerialization.writeEdgeMeta( scope, now, NodeType.SOURCE, slice2, timestamp, types ) );
 
-        batch.mergeShallow( edgeShardSerialization.writeEdgeMeta( scope, now, slice3, timestamp, types ) );
+        batch.mergeShallow( edgeShardSerialization.writeEdgeMeta( scope, now, NodeType.SOURCE, slice3, timestamp, types ) );
 
         batch.execute();
 
 
-        Iterator<Shard> results = edgeShardSerialization.getEdgeMetaData( scope, now, Optional.<Shard>absent(), types );
+        Iterator<Shard> results = edgeShardSerialization.getEdgeMetaData( scope, now, NodeType.SOURCE, Optional.<Shard>absent(), types );
 
         Shard next = results.next();
 
@@ -129,8 +129,14 @@ public class EdgeShardSerializationTest {
 
         assertFalse( results.hasNext() );
 
+        //test we get nothing with the other node type
+        results = edgeShardSerialization.getEdgeMetaData( scope, now, NodeType.TARGET, Optional.<Shard>absent(), types );
+
+        assertFalse(results.hasNext());
+
+
         //test paging and size
-        results = edgeShardSerialization.getEdgeMetaData( scope, now, Optional.of( new Shard( slice2, 0l ) ), types );
+        results = edgeShardSerialization.getEdgeMetaData( scope, now, NodeType.SOURCE, Optional.of( new Shard( slice2, 0l ) ), types );
 
         next = results.next();
 
@@ -162,16 +168,16 @@ public class EdgeShardSerializationTest {
 
         String[] types = { "edgeType", "subType" };
 
-        MutationBatch batch = edgeShardSerialization.writeEdgeMeta( scope, now, slice1, timestamp, types );
+        MutationBatch batch = edgeShardSerialization.writeEdgeMeta( scope, now, NodeType.SOURCE, slice1, timestamp, types );
 
-        batch.mergeShallow( edgeShardSerialization.writeEdgeMeta( scope, now, slice2, timestamp,types ) );
+        batch.mergeShallow( edgeShardSerialization.writeEdgeMeta( scope, now, NodeType.SOURCE, slice2, timestamp,types ) );
 
-        batch.mergeShallow( edgeShardSerialization.writeEdgeMeta( scope, now, slice3, timestamp, types ) );
+        batch.mergeShallow( edgeShardSerialization.writeEdgeMeta( scope, now, NodeType.SOURCE, slice3, timestamp, types ) );
 
         batch.execute();
 
 
-        Iterator<Shard> results = edgeShardSerialization.getEdgeMetaData( scope, now, Optional.<Shard>absent(), types );
+        Iterator<Shard> results = edgeShardSerialization.getEdgeMetaData( scope, now, NodeType.SOURCE, Optional.<Shard>absent(), types );
 
         assertEquals( slice3, results.next().getShardIndex() );
 
@@ -181,10 +187,15 @@ public class EdgeShardSerializationTest {
 
         assertFalse( results.hasNext() );
 
+        //test nothing with other type
+        results = edgeShardSerialization.getEdgeMetaData( scope, now, NodeType.TARGET, Optional.<Shard>absent(), types );
+
+        assertFalse(results.hasNext());
+
         //test paging and size
-        edgeShardSerialization.removeEdgeMeta( scope, now, slice1, types ).execute();
+        edgeShardSerialization.removeEdgeMeta( scope, now, NodeType.SOURCE, slice1, types ).execute();
 
-        results = edgeShardSerialization.getEdgeMetaData( scope, now,Optional.<Shard>absent(), types );
+        results = edgeShardSerialization.getEdgeMetaData( scope, now, NodeType.SOURCE, Optional.<Shard>absent(), types );
 
         assertEquals( slice3, results.next().getShardIndex() );
 
@@ -193,11 +204,11 @@ public class EdgeShardSerializationTest {
         assertFalse( results.hasNext() );
 
 
-        edgeShardSerialization.removeEdgeMeta( scope, now, slice2, types ).execute();
+        edgeShardSerialization.removeEdgeMeta( scope, now, NodeType.SOURCE, slice2, types ).execute();
 
-        edgeShardSerialization.removeEdgeMeta( scope, now, slice3, types ).execute();
+        edgeShardSerialization.removeEdgeMeta( scope, now, NodeType.SOURCE, slice3, types ).execute();
 
-        results = edgeShardSerialization.getEdgeMetaData( scope, now, Optional.<Shard>absent(), types );
+        results = edgeShardSerialization.getEdgeMetaData( scope, now, NodeType.SOURCE, Optional.<Shard>absent(), types );
 
 
         assertFalse( results.hasNext() );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7425ba5d/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
index 7546531..383e32c 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
@@ -44,6 +44,7 @@ import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.crea
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.same;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -136,15 +137,18 @@ public class NodeShardAllocationTest {
          * Mock up returning an empty iterator, our audit shouldn't create a new shard
          */
         when( edgeShardSerialization
-                .getEdgeMetaData( same( scope ), same( nodeId ), any( Optional.class ),  same( type ),
+                .getEdgeMetaData( same( scope ), same( nodeId ), eq( NodeType.SOURCE ), any( Optional.class ),  same( type ),
                         same( subType ) ) ).thenReturn( Collections.<Shard>emptyList().iterator() );
 
-        final boolean result = approximation.auditMaxShard( scope, nodeId, type, subType );
+        final boolean result = approximation.auditMaxShard( scope, nodeId, NodeType.SOURCE, type, subType );
 
         assertFalse( "No shard allocated", result );
     }
 
 
+
+
+
     @Test
     public void existingFutureShardSameTime() {
         final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
@@ -182,10 +186,10 @@ public class NodeShardAllocationTest {
          * Mock up returning a min shard, and a future shard
          */
         when( edgeShardSerialization
-                .getEdgeMetaData( same( scope ), same( nodeId ), any( Optional.class ),  same( type ),
+                .getEdgeMetaData( same( scope ), same( nodeId ), eq( NodeType.TARGET), any( Optional.class ),  same( type ),
                         same( subType ) ) ).thenReturn( Arrays.asList( futureShard ).iterator() );
 
-        final boolean result = approximation.auditMaxShard( scope, nodeId, type, subType );
+        final boolean result = approximation.auditMaxShard( scope, nodeId, NodeType.TARGET,  type, subType );
 
         assertFalse( "No shard allocated", result );
     }
@@ -226,7 +230,7 @@ public class NodeShardAllocationTest {
          * Mock up returning a min shard, and a future shard
          */
         when( edgeShardSerialization
-                .getEdgeMetaData( same( scope ), same( nodeId ), any( Optional.class ),  same( type ),
+                .getEdgeMetaData( same( scope ), same( nodeId ), eq(NodeType.TARGET), any( Optional.class ),  same( type ),
                         same( subType ) ) ).thenReturn( Arrays.asList( new Shard(0l, 0l) ).iterator() );
 
 
@@ -234,9 +238,9 @@ public class NodeShardAllocationTest {
 
         final long count = graphFig.getShardSize() - 1;
 
-        when( nodeShardApproximation.getCount(scope, nodeId, 0l, type, subType )).thenReturn( count );
+        when( nodeShardApproximation.getCount(scope, nodeId, NodeType.TARGET, 0l, type, subType )).thenReturn( count );
 
-        final boolean result = approximation.auditMaxShard( scope, nodeId, type, subType );
+        final boolean result = approximation.auditMaxShard( scope, nodeId, NodeType.TARGET, type, subType );
 
         assertFalse( "Shard allocated", result );
     }
@@ -277,7 +281,7 @@ public class NodeShardAllocationTest {
          * Mock up returning a min shard
          */
         when( edgeShardSerialization
-                .getEdgeMetaData( same( scope ), same( nodeId ), any( Optional.class ),  same( type ),
+                .getEdgeMetaData( same( scope ), same( nodeId ), eq(NodeType.SOURCE), any( Optional.class ),  same( type ),
                         same( subType ) ) ).thenReturn( Arrays.asList( new Shard(0l, 0l) ).iterator() );
 
 
@@ -285,7 +289,7 @@ public class NodeShardAllocationTest {
 
         //return a shard size equal to our max
         when( nodeShardApproximation
-                .getCount(   scope , nodeId, 0l,type , subType  ))
+                .getCount(   scope , nodeId, NodeType.SOURCE, 0l,type , subType  ))
                 .thenReturn( shardCount );
 
         ArgumentCaptor<Long> shardValue = ArgumentCaptor.forClass( Long.class );
@@ -294,11 +298,11 @@ public class NodeShardAllocationTest {
 
         //mock up our mutation
         when( edgeShardSerialization
-                .writeEdgeMeta( same( scope ), same( nodeId ), shardValue.capture(), timestampValue.capture(), same( type ), same( subType ) ) )
+                .writeEdgeMeta( same( scope ), same( nodeId ), eq(NodeType.SOURCE), shardValue.capture(), timestampValue.capture(), same( type ), same( subType ) ) )
                 .thenReturn( mock( MutationBatch.class ) );
 
 
-        final boolean result = approximation.auditMaxShard( scope, nodeId, type, subType );
+        final boolean result = approximation.auditMaxShard( scope, nodeId, NodeType.SOURCE,  type, subType );
 
         assertTrue( "Shard allocated", result );
 
@@ -379,7 +383,7 @@ public class NodeShardAllocationTest {
          * Mock up returning a min shard
          */
         when( edgeShardSerialization
-                .getEdgeMetaData( same( scope ), same( nodeId ), any( Optional.class ), same( type ),
+                .getEdgeMetaData( same( scope ), same( nodeId ), eq(NodeType.TARGET), any( Optional.class ), same( type ),
                         same( subType ) ) ).thenReturn( Arrays.asList(futureShard3, futureShard2, futureShard1, minShard).iterator() );
 
 
@@ -391,12 +395,12 @@ public class NodeShardAllocationTest {
 
         //mock up our mutation
         when( edgeShardSerialization
-                .removeEdgeMeta( same( scope ), same( nodeId ), newLongValue.capture(), same( type ), same( subType ) ) )
+                .removeEdgeMeta( same( scope ), same( nodeId ), eq(NodeType.TARGET), newLongValue.capture(), same( type ), same( subType ) ) )
                 .thenReturn( mock( MutationBatch.class ) );
 
 
         final Iterator<Shard>
-                result = approximation.getSourceShards( scope, nodeId, Optional.<Shard>absent(), type, subType );
+                result = approximation.getShards( scope, nodeId, NodeType.TARGET, Optional.<Shard>absent(), type, subType );
 
 
         assertTrue( "Shards present", result.hasNext() );
@@ -453,10 +457,10 @@ public class NodeShardAllocationTest {
          * Mock up returning an empty iterator, our audit shouldn't create a new shard
          */
         when( edgeShardSerialization
-                .getEdgeMetaData( same( scope ), same( nodeId ), any( Optional.class ),  same( type ),
+                .getEdgeMetaData( same( scope ), same( nodeId ), eq(NodeType.TARGET), any( Optional.class ),  same( type ),
                         same( subType ) ) ).thenReturn( Collections.<Shard>emptyList().iterator() );
 
-        final Iterator<Shard> result = approximation.getSourceShards( scope, nodeId, Optional.<Shard>absent(), type,
+        final Iterator<Shard> result = approximation.getShards( scope, nodeId, NodeType.TARGET,  Optional.<Shard>absent(), type,
                 subType );
 
         assertEquals("0 shard allocated", 0l, result.next().getShardIndex());

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7425ba5d/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java
index da19ce5..51448b7 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java
@@ -39,12 +39,15 @@ import org.junit.Test;
 import org.safehaus.guicyfig.Bypass;
 import org.safehaus.guicyfig.OptionState;
 import org.safehaus.guicyfig.Overrides;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
 import org.apache.usergrid.persistence.core.consistency.TimeService;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.GraphFig;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardApproximation;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeType;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 
@@ -68,6 +71,7 @@ import static org.mockito.Mockito.when;
 
 public class NodeShardApproximationTest {
 
+    private static final Logger LOG = LoggerFactory.getLogger( NodeShardApproximation.class );
 
     private GraphFig graphFig;
 
@@ -92,25 +96,24 @@ public class NodeShardApproximationTest {
 
         when( graphFig.getShardCacheSize() ).thenReturn( 10000l );
         when( graphFig.getShardSize() ).thenReturn( 250000l );
+        when( graphFig.getCounterFlushQueueSize() ).thenReturn( 10000 );
 
         nodeShardCounterSerialization = mock( NodeShardCounterSerialization.class );
 
-        when(nodeShardCounterSerialization.flush( any(Counter.class) )).thenReturn( mock( MutationBatch.class) );
-
+        when( nodeShardCounterSerialization.flush( any( Counter.class ) ) ).thenReturn( mock( MutationBatch.class ) );
 
 
         timeService = mock( TimeService.class );
 
-        when(timeService.getCurrentTime()).thenReturn( System.currentTimeMillis() );
+        when( timeService.getCurrentTime() ).thenReturn( System.currentTimeMillis() );
     }
 
 
     @Test
-    public void testSingleShard() {
-
+    public void testSingleShard() throws InterruptedException {
 
-        when(graphFig.getCounterFlushCount()).thenReturn( 100000l );
 
+        when( graphFig.getCounterFlushCount() ).thenReturn( 100000l );
         NodeShardApproximation approximation =
                 new NodeShardApproximationImpl( graphFig, nodeShardCounterSerialization, timeService );
 
@@ -120,7 +123,9 @@ public class NodeShardApproximationTest {
         final String type = "type";
         final String type2 = "subType";
 
-        long count = approximation.getCount( scope, id, shardId, type, type2 );
+        long count = approximation.getCount( scope, id, NodeType.TARGET, shardId, type, type2 );
+
+        waitForFlush( approximation );
 
         assertEquals( 0, count );
     }
@@ -130,8 +135,6 @@ public class NodeShardApproximationTest {
     public void testSingleShardMultipleThreads() throws ExecutionException, InterruptedException {
 
 
-
-
         NodeShardCounterSerialization serialization = new TestNodeShardCounterSerialization();
 
         final NodeShardApproximation approximation =
@@ -158,7 +161,7 @@ public class NodeShardApproximationTest {
                 public Long call() throws Exception {
 
                     for ( int i = 0; i < increments; i++ ) {
-                        approximation.increment( scope, id, shardId, 1, type, type2 );
+                        approximation.increment( scope, id, NodeType.TARGET, shardId, 1, type, type2 );
                     }
 
                     return 0l;
@@ -169,24 +172,25 @@ public class NodeShardApproximationTest {
         }
 
 
-
         for ( Future<Long> future : futures ) {
-           future.get();
+            future.get();
         }
 
-
+        waitForFlush( approximation );
         //get our count.  It should be accurate b/c we only have 1 instance
 
-        final long returnedCount = approximation.getCount( scope, id, shardId, type, type2);
+        final long returnedCount = approximation.getCount( scope, id, NodeType.TARGET, shardId, type, type2 );
         final long expected = workers * increments;
 
 
-        assertEquals(expected, returnedCount);
-
+        assertEquals( expected, returnedCount );
 
+        //test we get nothing with the other type
 
+        final long emptyCount = approximation.getCount( scope, id, NodeType.SOURCE, shardId, type, type2 );
 
 
+        assertEquals( 0, emptyCount );
     }
 
 
@@ -195,8 +199,6 @@ public class NodeShardApproximationTest {
     public void testMultipleShardMultipleThreads() throws ExecutionException, InterruptedException {
 
 
-
-
         NodeShardCounterSerialization serialization = new TestNodeShardCounterSerialization();
 
         final NodeShardApproximation approximation =
@@ -210,8 +212,7 @@ public class NodeShardApproximationTest {
         final String type = "type";
         final String type2 = "subType";
 
-        final AtomicLong shardIdCounter = new AtomicLong(  );
-
+        final AtomicLong shardIdCounter = new AtomicLong();
 
 
         ExecutorService executor = Executors.newFixedThreadPool( workers );
@@ -227,7 +228,7 @@ public class NodeShardApproximationTest {
                     final long threadShardId = shardIdCounter.incrementAndGet();
 
                     for ( int i = 0; i < increments; i++ ) {
-                        approximation.increment( scope, id, threadShardId, 1, type, type2 );
+                        approximation.increment( scope, id, NodeType.SOURCE, threadShardId, 1, type, type2 );
                     }
 
                     return threadShardId;
@@ -238,29 +239,41 @@ public class NodeShardApproximationTest {
         }
 
 
-
         for ( Future<Long> future : futures ) {
-           final long shardId = future.get();
+            final long shardId = future.get();
+
+            waitForFlush( approximation );
 
-            final long returnedCount = approximation.getCount( scope, id, shardId, type, type2);
+            final long returnedCount = approximation.getCount( scope, id, NodeType.SOURCE, shardId, type, type2 );
 
-            assertEquals(increments, returnedCount);
+            assertEquals( increments, returnedCount );
         }
+    }
 
 
+    private void waitForFlush( NodeShardApproximation approximation ) throws InterruptedException {
 
+        approximation.beginFlush();
 
+        while ( approximation.flushPending() ) {
+
+            LOG.info("Waiting on beginFlush to complete");
+
+            Thread.sleep( 100 );
+        }
     }
 
 
+
     /**
      * These are created b/c we can't use Mockito.  It OOM's with keeping track of all the mock invocations
      */
 
-    private static class TestNodeShardCounterSerialization implements NodeShardCounterSerialization{
+    private static class TestNodeShardCounterSerialization implements NodeShardCounterSerialization {
 
         private Counter copy = new Counter();
 
+
         @Override
         public MutationBatch flush( final Counter counter ) {
             copy.merge( counter );
@@ -281,7 +294,6 @@ public class NodeShardApproximationTest {
     }
 
 
-
     /**
      * Simple test mutation to no-op during tests
      */
@@ -415,14 +427,14 @@ public class NodeShardApproximationTest {
     }
 
 
-
-    private static class TestGraphFig implements GraphFig{
+    private static class TestGraphFig implements GraphFig {
 
         @Override
         public int getScanPageSize() {
             return 0;  //To change body of implemented methods use File | Settings | File Templates.
         }
 
+
         @Override
         public int getRepairConcurrentSize() {
             return 0;  //To change body of implemented methods use File | Settings | File Templates.
@@ -460,6 +472,12 @@ public class NodeShardApproximationTest {
 
 
         @Override
+        public int getCounterFlushQueueSize() {
+            return 10000;
+        }
+
+
+        @Override
         public void addPropertyChangeListener( final PropertyChangeListener propertyChangeListener ) {
             //To change body of implemented methods use File | Settings | File Templates.
         }
@@ -555,7 +573,8 @@ public class NodeShardApproximationTest {
         }
     }
 
-    private static class TestTimeService implements TimeService{
+
+    private static class TestTimeService implements TimeService {
 
         @Override
         public long getCurrentTime() {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7425ba5d/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationTest.java
index 9968f67..aad918e 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationTest.java
@@ -35,6 +35,7 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.GraphFig;
 import org.apache.usergrid.persistence.graph.guice.TestGraphModule;
 import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeType;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 
@@ -96,11 +97,11 @@ public class NodeShardCounterSerializationTest {
 
         final Id id = createId( "test" );
 
-        ShardKey key1 = new ShardKey( scope, id, 0, "type1" );
+        ShardKey key1 = new ShardKey( scope, id, NodeType.SOURCE, 0, "type1" );
 
-        ShardKey key2 = new ShardKey( scope, id, 0, "type2" );
+        ShardKey key2 = new ShardKey( scope, id, NodeType.SOURCE, 0, "type2" );
 
-        ShardKey key3 = new ShardKey( scope, id, 1, "type1" );
+        ShardKey key3 = new ShardKey( scope, id, NodeType.SOURCE, 1, "type1" );
 
 
         Counter counter = new Counter();