You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/03/27 22:21:31 UTC

[17/50] [abbrv] git commit: Merge with hystrix. Added debug logging

Merge with hystrix.  Added debug logging


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/029c0650
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/029c0650
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/029c0650

Branch: refs/pull/77/merge
Commit: 029c06502c7359117a49638a7a198375dc0bb868
Parents: 136edab
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Mar 25 17:32:31 2014 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Mar 25 17:32:31 2014 -0700

----------------------------------------------------------------------
 .../graph/impl/NodeDeleteListener.java          |  35 +++---
 .../impl/EdgeSerializationImpl.java             |  16 +--
 .../impl/parse/ObservableIterator.java          |  12 +-
 .../graph/GraphManagerTimeoutIT.java            |  48 ++++----
 .../graph/impl/NodeDeleteListenerTest.java      |  41 ++++---
 .../serialization/EdgeSerializationTest.java    |  93 ++++++++++-----
 .../graph/serialization/TestCount.java          | 117 +++++++++++++++++++
 .../graph/test/util/EdgeTestUtils.java          |  12 +-
 stack/corepersistence/pom.xml                   |   2 +-
 9 files changed, 271 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/029c0650/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
index 4ba0142..d0c0bec 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
@@ -53,30 +53,20 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, Intege
     private final EdgeMetadataSerialization edgeMetadataSerialization;
     private final EdgeDeleteRepair edgeDeleteRepair;
     private final EdgeMetaRepair edgeMetaRepair;
-<<<<<<< Updated upstream
-=======
     private final GraphFig graphFig;
     protected final Keyspace keyspace;
->>>>>>> Stashed changes
 
 
     /**
      * Wire the serialization dependencies
      */
     @Inject
-<<<<<<< Updated upstream
-    public NodeDeleteListener( final NodeSerialization nodeSerialization, final EdgeSerialization edgeSerialization, @NodeDelete final AsyncProcessor nodeDelete,
-                               final EdgeMetadataSerialization edgeMetadataSerialization,
-                               final EdgeDeleteRepair edgeDeleteRepair, final EdgeMetaRepair edgeMetaRepair
-                                ) {
-=======
     public NodeDeleteListener( final NodeSerialization nodeSerialization, final EdgeSerialization edgeSerialization,
 
                                final EdgeMetadataSerialization edgeMetadataSerialization,
                                final EdgeDeleteRepair edgeDeleteRepair, final EdgeMetaRepair edgeMetaRepair,
                                final GraphFig graphFig, @NodeDelete final AsyncProcessor nodeDelete,
                                final Keyspace keyspace ) {
->>>>>>> Stashed changes
 
 
         this.nodeSerialization = nodeSerialization;
@@ -163,11 +153,13 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, Intege
                             @Override
                             public Observable<MarkedEdge> call( final List<MarkedEdge> markedEdges ) {
 
+                                LOG.debug( "Batching {} edges for deletion" , markedEdges.size());
+
                                 final MutationBatch batch = keyspace.prepareMutationBatch();
 
                                 for(MarkedEdge edge: markedEdges){
 
-                                //delete the newest edge <= the version on the node delete
+                                    //delete the newest edge <= the version on the node delete
                                     LOG.debug( "Deleting edge {}", edge );
                                     final MutationBatch delete = edgeSerialization.deleteEdge( scope,  edge );
 
@@ -184,12 +176,17 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, Intege
                                 return Observable.from(markedEdges);
                             }
                         } )
-        .flatMap( new Func1<MarkedEdge, Observable<MarkedEdge>>() {
-            @Override
-            public Observable<MarkedEdge> call( final MarkedEdge edge ) {
+                        //TODO Fix this
+//        .flatMap( new Func1<MarkedEdge, Observable<MarkedEdge>>() {
+//            @Override
+//            public Observable<MarkedEdge> call( final MarkedEdge edge ) {
+//
+//
+//                return Observable.just( edge );
+
+
 
 
-                return Observable.just( edge );
 //                //delete both the source and target meta data in parallel for the edge we deleted in the previous step
 //                //if nothing else is using them
 //                Observable<Integer> sourceMetaRepaired =
@@ -209,8 +206,12 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, Intege
 //                                         return edge;
 //                                     }
 //                                 } );
-            }
-        } ).count()
+
+
+//            }
+//        })
+
+    .count()
                 //if nothing is ever emitted, emit 0 so that we know no operations took place. Finally remove the
                 // target node in the mark
                 .defaultIfEmpty( 0 ).doOnCompleted( new Action0() {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/029c0650/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
index d255929..0341cac 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
@@ -156,8 +156,8 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
 
         doWrite( scope, edge, new RowOp<RowKey>() {
             @Override
-            public <R extends RowKey> void doWrite(
-                    final MultiTennantColumnFamily<OrganizationScope, R, DirectedEdge> columnFamily, final R rowKey,
+            public  void doWrite(
+                    final MultiTennantColumnFamily<OrganizationScope, RowKey, DirectedEdge> columnFamily, final RowKey rowKey,
                     final DirectedEdge edge ) {
                 batch.withRow( columnFamily, ScopedRowKey.fromKey( scope, rowKey ) ).putColumn( edge, false );
             }
@@ -182,8 +182,8 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
 
         doWrite( scope, edge, new RowOp<RowKey>() {
             @Override
-            public <R extends RowKey> void doWrite(
-                    final MultiTennantColumnFamily<OrganizationScope, R, DirectedEdge> columnFamily, final R rowKey,
+            public void doWrite(
+                    final MultiTennantColumnFamily<OrganizationScope, RowKey, DirectedEdge> columnFamily, final RowKey rowKey,
                     final DirectedEdge edge ) {
                 batch.withRow( columnFamily, ScopedRowKey.fromKey( scope, rowKey ) ).putColumn( edge, true );
             }
@@ -208,8 +208,8 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
 
         doWrite( scope, edge, new RowOp<RowKey>() {
             @Override
-            public <R extends RowKey> void doWrite(
-                    final MultiTennantColumnFamily<OrganizationScope, R, DirectedEdge> columnFamily, final R rowKey,
+            public void doWrite(
+                    final MultiTennantColumnFamily<OrganizationScope, RowKey, DirectedEdge> columnFamily, final RowKey rowKey,
                     final DirectedEdge edge ) {
                 batch.withRow( columnFamily, ScopedRowKey.fromKey( scope, rowKey ) ).deleteColumn( edge );
             }
@@ -493,7 +493,7 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
                         .autoPaginate( true ).withColumnRange( rangeBuilder.build() );
 
 
-        return new ColumnNameIterator<DirectedEdge, MarkedEdge>( query, searcher, searcher.hasPage() );
+        return new ColumnNameIterator<DirectedEdge, MarkedEdge>( query, searcher, searcher.hasPage(), graphFig.getReadTimeout() );
     }
 
 
@@ -837,7 +837,7 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
         /**
          * Write the edge with the given data
          */
-        <R extends RowKey> void doWrite(
+        void doWrite(
                 final MultiTennantColumnFamily<OrganizationScope, R, DirectedEdge> columnFamily, R rowKey,
                 DirectedEdge edge );
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/029c0650/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ObservableIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ObservableIterator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ObservableIterator.java
index 2274868..2d2456b 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ObservableIterator.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ObservableIterator.java
@@ -3,6 +3,10 @@ package org.apache.usergrid.persistence.graph.serialization.impl.parse;
 
 import java.util.Iterator;
 
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import rx.Observable;
 import rx.Observer;
 import rx.Subscriber;
@@ -17,6 +21,8 @@ import rx.subscriptions.Subscriptions;
  */
 public abstract class ObservableIterator<T> implements Observable.OnSubscribe<T> {
 
+    private static final Logger log = LoggerFactory.getLogger( ObservableIterator.class );
+
 
     @Override
     public void call( final Subscriber<? super T> subscriber ) {
@@ -29,7 +35,11 @@ public abstract class ObservableIterator<T> implements Observable.OnSubscribe<T>
 
             //while we have items to emit and our subscriber is subscribed, we want to keep emitting items
             while ( itr.hasNext() && !subscriber.isUnsubscribed()) {
-                subscriber.onNext( itr.next() );
+                final T next = itr.next();
+
+                log.debug( "Emitting {}", next );
+
+                subscriber.onNext( next );
             }
 
             subscriber.onCompleted();

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/029c0650/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerTimeoutIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerTimeoutIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerTimeoutIT.java
index 509fe6a..ed8c1b8 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerTimeoutIT.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerTimeoutIT.java
@@ -119,7 +119,7 @@ public class GraphManagerTimeoutIT {
     public void testWriteReadEdgeTypeSource( EdgeSerialization serialization ) throws InterruptedException {
 
 
-        final GraphManager em = emf.createGraphManager( scope );
+        final GraphManager em = emf.createEdgeManager( scope );
 
 
         final MarkedEdge edge = createEdge( "source", "edge", "target" );
@@ -223,7 +223,7 @@ public class GraphManagerTimeoutIT {
     @Test
     public void testWriteReadEdgeTypeTarget() {
 
-        GraphManager em = emf.createGraphManager( scope );
+        GraphManager em = emf.createEdgeManager( scope );
 
 
         Edge edge = createEdge( "source", "test", "target" );
@@ -256,7 +256,7 @@ public class GraphManagerTimeoutIT {
     @Test
     public void testWriteReadEdgeTypeVersionSource() {
 
-        GraphManager em = emf.createGraphManager( scope );
+        GraphManager em = emf.createEdgeManager( scope );
 
         final UUID earlyVersion = UUIDGenerator.newTimeUUID();
 
@@ -291,7 +291,7 @@ public class GraphManagerTimeoutIT {
     @Test
     public void testWriteReadEdgeTypeVersionTarget() {
 
-        GraphManager em = emf.createGraphManager( scope );
+        GraphManager em = emf.createEdgeManager( scope );
 
 
         final UUID earlyVersion = UUIDGenerator.newTimeUUID();
@@ -330,7 +330,7 @@ public class GraphManagerTimeoutIT {
     @Test
     public void testWriteReadEdgeTypeVersionSourceDistinct() {
 
-        GraphManager em = emf.createGraphManager( scope );
+        GraphManager em = emf.createEdgeManager( scope );
 
         final UUID earlyVersion = UUIDGenerator.newTimeUUID();
 
@@ -399,7 +399,7 @@ public class GraphManagerTimeoutIT {
     public void testWriteReadEdgeTypeVersionTargetDistinct() {
 
 
-        GraphManager em = emf.createGraphManager( scope );
+        GraphManager em = emf.createEdgeManager( scope );
 
         final UUID earlyVersion = UUIDGenerator.newTimeUUID();
 
@@ -467,7 +467,7 @@ public class GraphManagerTimeoutIT {
     @Test
     public void testWriteReadEdgeTypePagingSource() {
 
-        GraphManager em = emf.createGraphManager( scope );
+        GraphManager em = emf.createEdgeManager( scope );
 
         final Id sourceId = createId( "source" );
 
@@ -518,7 +518,7 @@ public class GraphManagerTimeoutIT {
     public void testWriteReadEdgeTypePagingTarget() {
 
 
-        GraphManager em = emf.createGraphManager( scope );
+        GraphManager em = emf.createEdgeManager( scope );
 
 
         final Id targetId = createId( "target" );
@@ -569,7 +569,7 @@ public class GraphManagerTimeoutIT {
     @Test
     public void testWriteReadEdgeTypeTargetTypeSource() {
 
-        GraphManager em = emf.createGraphManager( scope );
+        GraphManager em = emf.createEdgeManager( scope );
 
 
         Edge edge = createEdge( "source", "test", "target" );
@@ -605,7 +605,7 @@ public class GraphManagerTimeoutIT {
     @Test
     public void testWriteReadEdgeTypeTargetTypeTarget() {
 
-        GraphManager em = emf.createGraphManager( scope );
+        GraphManager em = emf.createEdgeManager( scope );
 
 
         Edge edge = createEdge( "source", "test", "target" );
@@ -641,7 +641,7 @@ public class GraphManagerTimeoutIT {
     @Test
     public void testWriteReadEdgeDeleteSource() {
 
-        GraphManager em = emf.createGraphManager( scope );
+        GraphManager em = emf.createEdgeManager( scope );
 
 
         Edge edge = createEdge( "source", "test", "target" );
@@ -697,7 +697,7 @@ public class GraphManagerTimeoutIT {
     @Test
     public void testWriteReadEdgeDeleteTarget() {
 
-        GraphManager em = emf.createGraphManager( scope );
+        GraphManager em = emf.createEdgeManager( scope );
 
 
         Edge edge = createEdge( "source", "test", "target" );
@@ -753,7 +753,7 @@ public class GraphManagerTimeoutIT {
     @Test
     public void testWriteReadEdgeTypesSourceTypes() {
 
-        final GraphManager em = emf.createGraphManager( scope );
+        final GraphManager em = emf.createEdgeManager( scope );
 
         Id sourceId = new SimpleId( "source" );
         Id targetId1 = new SimpleId( "target" );
@@ -815,7 +815,7 @@ public class GraphManagerTimeoutIT {
     @Test
     public void testWriteReadEdgeTypesTargetTypes() {
 
-        final GraphManager em = emf.createGraphManager( scope );
+        final GraphManager em = emf.createEdgeManager( scope );
 
         Id sourceId1 = new SimpleId( "source" );
         Id sourceId2 = new SimpleId( "source2" );
@@ -880,7 +880,7 @@ public class GraphManagerTimeoutIT {
     @Test
     public void testWriteReadEdgeTypesSourceTypesPaging() {
 
-        final GraphManager em = emf.createGraphManager( scope );
+        final GraphManager em = emf.createEdgeManager( scope );
 
         Id sourceId1 = new SimpleId( "source" );
         Id targetId1 = new SimpleId( "target" );
@@ -958,7 +958,7 @@ public class GraphManagerTimeoutIT {
     @Test
     public void testWriteReadEdgeTypesTargetTypesPaging() {
 
-        final GraphManager em = emf.createGraphManager( scope );
+        final GraphManager em = emf.createEdgeManager( scope );
 
         Id sourceId1 = new SimpleId( "source" );
         Id sourceId2 = new SimpleId( "source2" );
@@ -1039,7 +1039,7 @@ public class GraphManagerTimeoutIT {
     @Test
     public void testMarkSourceEdges() {
 
-        final GraphManager em = emf.createGraphManager( scope );
+        final GraphManager em = emf.createEdgeManager( scope );
 
         Id sourceId = new SimpleId( "source" );
         Id targetId1 = new SimpleId( "target" );
@@ -1108,7 +1108,7 @@ public class GraphManagerTimeoutIT {
     @Test
     public void testMarkTargetEdges() {
 
-        final GraphManager em = emf.createGraphManager( scope );
+        final GraphManager em = emf.createEdgeManager( scope );
 
         Id sourceId1 = new SimpleId( "source" );
         Id sourceId2 = new SimpleId( "source2" );
@@ -1175,7 +1175,7 @@ public class GraphManagerTimeoutIT {
     @Test
     public void testMarkSourceEdgesType() {
 
-        final GraphManager em = emf.createGraphManager( scope );
+        final GraphManager em = emf.createEdgeManager( scope );
 
         Id sourceId = new SimpleId( "source" );
         Id targetId1 = new SimpleId( "target" );
@@ -1253,7 +1253,7 @@ public class GraphManagerTimeoutIT {
     @Test
     public void testMarkTargetEdgesType() {
 
-        final GraphManager em = emf.createGraphManager( scope );
+        final GraphManager em = emf.createEdgeManager( scope );
 
         Id sourceId1 = new SimpleId( "source" );
         Id sourceId2 = new SimpleId( "source2" );
@@ -1331,7 +1331,7 @@ public class GraphManagerTimeoutIT {
     @Test
     public void markSourceNode() {
 
-        final GraphManager em = emf.createGraphManager( scope );
+        final GraphManager em = emf.createEdgeManager( scope );
 
         Id sourceId = new SimpleId( "source" );
         Id targetId1 = new SimpleId( "target" );
@@ -1413,7 +1413,7 @@ public class GraphManagerTimeoutIT {
     @Test
     public void markTargetNode() {
 
-        final GraphManager em = emf.createGraphManager( scope );
+        final GraphManager em = emf.createEdgeManager( scope );
 
         Id sourceId1 = new SimpleId( "source" );
         Id sourceId2 = new SimpleId( "source2" );
@@ -1494,7 +1494,7 @@ public class GraphManagerTimeoutIT {
 
     @Test( expected = NullPointerException.class )
     public void invalidEdgeTypesWrite( @All Edge edge ) {
-        final GraphManager em = emf.createGraphManager( scope );
+        final GraphManager em = emf.createEdgeManager( scope );
 
         em.writeEdge( edge );
     }
@@ -1502,7 +1502,7 @@ public class GraphManagerTimeoutIT {
 
     @Test( expected = NullPointerException.class )
     public void invalidEdgeTypesDelete( @All Edge edge ) {
-        final GraphManager em = emf.createGraphManager( scope );
+        final GraphManager em = emf.createEdgeManager( scope );
 
         em.deleteEdge( edge );
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/029c0650/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
index e735aea..48890ca 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
@@ -11,6 +11,8 @@ import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.persistence.collection.OrganizationScope;
 import org.apache.usergrid.persistence.collection.cassandra.CassandraRule;
@@ -50,6 +52,8 @@ import static org.mockito.Mockito.when;
 @UseModules( { TestGraphModule.class } )
 public class NodeDeleteListenerTest {
 
+    private static final Logger log = LoggerFactory.getLogger( NodeDeleteListenerTest.class );
+
     @ClassRule
     public static CassandraRule rule = new CassandraRule();
 
@@ -309,8 +313,8 @@ public class NodeDeleteListenerTest {
 
 
     /**
-     * Simple test case that tests a single edge and removing the node.  The other target node should be removed as
-     * well since it has no other targets
+     * Simple test case that tests a single edge and removing the node.  The other target node should be removed as well
+     * since it has no other targets
      */
     @Test
     public void testMultiDelete() throws ConnectionException {
@@ -324,17 +328,21 @@ public class NodeDeleteListenerTest {
         final String edgeType = "test";
 
         int countSaved = 0;
+        int sourceCount = 0;
+        int targetCount = 0;
 
 
         for ( int i = 0; i < edgeCount; i++ ) {
-            Edge edge ;
+            Edge edge;
 
             //mix up source vs target, good for testing as well as create a lot of sub types to ensure they're removed
             if ( i % 2 == 0 ) {
-                edge = createEdge( toDelete, edgeType, createId( "target"+Math.random() ) );
+                edge = createEdge( toDelete, edgeType, createId( "target" + Math.random() ) );
+                sourceCount++;
             }
             else {
-                edge = createEdge( createId( "source"+Math.random() ), edgeType, toDelete );
+                edge = createEdge( createId( "source" + Math.random() ), edgeType, toDelete );
+                targetCount++;
             }
 
             //write the edge
@@ -346,13 +354,15 @@ public class NodeDeleteListenerTest {
             countSaved++;
         }
 
-        assertEquals(edgeCount, countSaved);
+        assertEquals( edgeCount, countSaved );
+
+        log.info( "Saved {} source edges", sourceCount );
+        log.info( "Saved {} target edges", targetCount );
 
 
         //mark the node so
-//        UUID deleteVersion = UUIDGenerator.newTimeUUID();
+        UUID deleteVersion = UUIDGenerator.newTimeUUID();
 
-        UUID deleteVersion = UUID.fromString( "ffffffff-ffff-1fff-bfff-ffffffffffff" );
 
         nodeSerialization.mark( scope, toDelete, deleteVersion ).execute();
 
@@ -368,21 +378,19 @@ public class NodeDeleteListenerTest {
         UUID now = UUIDGenerator.newTimeUUID();
 
 
-        Iterator<MarkedEdge> returned = edgeSerialization
-                .getEdgesFromSource( scope, createSearchByEdge( toDelete, edgeType, now, null ) );
+        Iterator<MarkedEdge> returned =
+                edgeSerialization.getEdgesFromSource( scope, createSearchByEdge( toDelete, edgeType, now, null ) );
 
         //no edge from source node should be returned
         assertFalse( "No source should be returned", returned.hasNext() );
 
         //validate it's not returned by the
 
-        returned = edgeSerialization
-                .getEdgesToTarget( scope, createSearchByEdge( toDelete, edgeType, now, null ) );
+        returned = edgeSerialization.getEdgesToTarget( scope, createSearchByEdge( toDelete, edgeType, now, null ) );
 
         assertFalse( "No target should be returned", returned.hasNext() );
 
 
-
         //no types from source
 
         Iterator<String> types =
@@ -400,14 +408,13 @@ public class NodeDeleteListenerTest {
 
         //no target types from source
 
-        Iterator<String> idTypes = edgeMetadataSerialization
-                .getIdTypesFromSource( scope, createSearchIdType( toDelete, edgeType, null ) );
+        Iterator<String> idTypes =
+                edgeMetadataSerialization.getIdTypesFromSource( scope, createSearchIdType( toDelete, edgeType, null ) );
 
         assertFalse( idTypes.hasNext() );
 
         //no source types to target
-        idTypes = edgeMetadataSerialization
-                .getIdTypesToTarget( scope, createSearchIdType( toDelete, edgeType, null ) );
+        idTypes = edgeMetadataSerialization.getIdTypesToTarget( scope, createSearchIdType( toDelete, edgeType, null ) );
 
         assertFalse( idTypes.hasNext() );
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/029c0650/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/EdgeSerializationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/EdgeSerializationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/EdgeSerializationTest.java
index 776b9df..9e016b2 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/EdgeSerializationTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/EdgeSerializationTest.java
@@ -13,6 +13,8 @@ import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.persistence.collection.OrganizationScope;
 import org.apache.usergrid.persistence.collection.cassandra.CassandraRule;
@@ -27,6 +29,8 @@ import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 
 import com.fasterxml.uuid.UUIDComparator;
 import com.google.inject.Inject;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
 import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createEdge;
@@ -45,10 +49,12 @@ import static org.mockito.Mockito.when;
  *
  *
  */
-@RunWith(JukitoRunner.class)
-@UseModules({ TestGraphModule.class })
+@RunWith( JukitoRunner.class )
+@UseModules( { TestGraphModule.class } )
 public class EdgeSerializationTest {
 
+    private static final Logger log = LoggerFactory.getLogger( EdgeSerializationTest.class );
+
     @ClassRule
     public static CassandraRule rule = new CassandraRule();
 
@@ -64,6 +70,9 @@ public class EdgeSerializationTest {
     @Inject
     protected GraphFig graphFig;
 
+    @Inject
+    protected Keyspace keyspace;
+
     protected OrganizationScope scope;
 
 
@@ -617,51 +626,83 @@ public class EdgeSerializationTest {
     }
 
 
-
     /**
      * Tests mixing 2 edge types between 2 nodes.  We should get results for the same source->destination with the 2
      * edge types
      */
     @Test
     public void testIteratorPaging() throws ConnectionException {
-        final Edge edgev1 = createEdge( "source", "edge1", "target" );
 
-        final Id sourceId = edgev1.getSourceNode();
-        final Id targetId = edgev1.getTargetNode();
 
+        final Id sourceId = createId( "source" );
+        final String edgeType = "edge";
+        final Id targetId = createId( "target" );
 
-        final Edge edgev2 = createEdge( sourceId, "edge1", targetId );
 
-        assertTrue( "Edge version 1 has lower time uuid",
-                UUIDComparator.staticCompare( edgev1.getVersion(), edgev2.getVersion() ) < 0 );
+        int writeCount = graphFig.getScanPageSize() * 3;
 
-        //create edge type 2 to ensure we don't get it in results
-        final Edge edgeType2V1 = createEdge( sourceId, "edge2", targetId );
 
-        serialization.writeEdge( scope, edgev1 ).execute();
-        serialization.writeEdge( scope, edgev2 ).execute();
-        serialization.writeEdge( scope, edgeType2V1 ).execute();
+        final MutationBatch batch = keyspace.prepareMutationBatch();
 
-        final UUID now = UUIDGenerator.newTimeUUID();
+        UUID lastMax = null;
 
+        for ( int i = 0; i < writeCount; i++ ) {
 
-        SearchByEdge search = createGetByEdge( sourceId, "edge1", targetId, now, null );
+            final Edge edge = createEdge( sourceId, edgeType, targetId );
 
-        Iterator<MarkedEdge> results = serialization.getEdgeVersions( scope, search );
+            lastMax = edge.getVersion();
 
-        assertEquals( edgev2, results.next() );
-        assertEquals( edgev1, results.next() );
-        assertFalse( "No results should be returned", results.hasNext() );
+            batch.mergeShallow( serialization.writeEdge( scope, edge ) );
+        }
 
-        //max version test
+        log.info( "Flushing edges" );
+        batch.execute();
 
-        //test max version
-        search = createGetByEdge( sourceId, "edge1", targetId, edgev1.getVersion(), null );
 
-        results = serialization.getEdgeVersions( scope, search );
+        Iterator<MarkedEdge> results =
+                serialization.getEdgeVersions( scope, createGetByEdge( sourceId, edgeType, targetId, lastMax, null ) );
 
-        assertEquals( edgev1, results.next() );
-        assertFalse( "Max version was honored", results.hasNext() );
+        verify( results, writeCount );
+
+
+
+        //get them all from source
+        results = serialization.getEdgesFromSource( scope, createSearchByEdge( sourceId, edgeType, lastMax, null ) );
+
+        verify( results, writeCount );
+
+
+
+        results = serialization.getEdgesFromSourceByTargetType( scope,
+                createSearchByEdgeAndId( sourceId, edgeType, lastMax, targetId.getType(), null ) );
+
+        verify( results, writeCount );
+
+
+
+        results = serialization.getEdgesToTarget( scope, createSearchByEdge( targetId, edgeType, lastMax, null ) );
+
+        verify( results, writeCount );
+
+
+
+        results = serialization.getEdgesToTargetBySourceType( scope,
+                createSearchByEdgeAndId( targetId, edgeType, lastMax, sourceId.getType(), null ) );
+
+        verify( results, writeCount );
+    }
+
+
+    private void verify( Iterator<MarkedEdge> results, int expectedCount ) {
+        int count = 0;
+
+        while ( results.hasNext() ) {
+            count++;
+            results.next();
+        }
+
+
+        assertEquals( "All versions returned", expectedCount, count );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/029c0650/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/TestCount.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/TestCount.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/TestCount.java
new file mode 100644
index 0000000..7af55da
--- /dev/null
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/TestCount.java
@@ -0,0 +1,117 @@
+package org.apache.usergrid.persistence.graph.serialization;
+
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.junit.Test;
+
+import rx.Observable;
+import rx.Subscriber;
+import rx.functions.Func1;
+import rx.schedulers.Schedulers;
+
+import static org.junit.Assert.assertEquals;
+
+
+/**
+ *
+ *
+ */
+public class TestCount {
+
+
+    @Test
+    public void mergeTest(){
+
+        final int sizePerObservable = 2000;
+
+        Observable<Integer> input1 = getObservables( sizePerObservable );
+        Observable<Integer> input2 = getObservables( sizePerObservable );
+
+       int returned =  Observable.merge(input1, input2).buffer( 1000 ).flatMap(
+               new Func1<List<Integer>, Observable<Integer>>() {
+                   @Override
+                   public Observable<Integer> call( final List<Integer> integers ) {
+
+                       //simulates batching a network operation from buffer, then re-emitting the values passed
+
+                       try {
+                           Thread.sleep( 100 );
+                       }
+                       catch ( InterruptedException e ) {
+                           throw new RuntimeException( e );
+                       }
+
+
+                       return Observable.from( integers );
+                   }
+               } ).count().defaultIfEmpty( 0 ).toBlockingObservable().last();
+
+
+        assertEquals("Count was correct", sizePerObservable*2, returned);
+    }
+
+
+    /**
+     * Get observables from the sets
+     * @param size
+     * @return
+     */
+    private Observable<Integer> getObservables( int size ){
+
+        final List<Integer> values = new ArrayList<Integer>(size);
+
+        for(int i = 0; i <size; i ++ ) {
+            values.add( i );
+        }
+
+
+        /**
+         * Simulates occasional sleeps while we fetch something over the network
+         */
+        return Observable.create( new Observable.OnSubscribe<Integer>() {
+            @Override
+            public void call( final Subscriber<? super Integer> subscriber ) {
+
+                final int size = values.size();
+
+                for(int i = 0; i < size; i ++){
+
+
+
+                    if(i%1000 == 0){
+                        //simulate network fetch
+                        try {
+                            Thread.sleep( 250 );
+                        }
+                        catch ( InterruptedException e ) {
+                            subscriber.onError( e );
+                            return;
+                        }
+                    }
+
+                    //Sleep for a very long time before emitting the last value
+                    if(i == size -1){
+                        try {
+                            Thread.sleep(5000);
+                        }
+                        catch ( InterruptedException e ) {
+                            subscriber.onError( e );
+                            return;
+                        }
+                    }
+
+
+                    subscriber.onNext( values.get( i ) );
+                }
+
+                subscriber.onCompleted();
+
+                //purposefully no error handling here
+            }
+        } ).subscribeOn( Schedulers.io() );
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/029c0650/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/test/util/EdgeTestUtils.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/test/util/EdgeTestUtils.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/test/util/EdgeTestUtils.java
index 68a67d2..27b9a38 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/test/util/EdgeTestUtils.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/test/util/EdgeTestUtils.java
@@ -70,22 +70,12 @@ public class EdgeTestUtils {
     /**
      * Create an edge with the specified params
      */
-    public static MarkedEdge createEdge( final Id sourceId, final String edgeType, final Id targetId,
-                                         final UUID version ) {
+    public static MarkedEdge createEdge( final Id sourceId, final String edgeType, final Id targetId, final UUID version ) {
         return new SimpleMarkedEdge( sourceId, edgeType, targetId, version, false );
     }
 
 
     /**
-     * Create an edge with the specified params
-     */
-    public static MarkedEdge createEdge( final String sourceType, final String edgeType, final String targetType,
-                                         final UUID version ) {
-        return new SimpleMarkedEdge( createId( sourceType ), edgeType, createId( targetType ), version, false );
-    }
-
-
-    /**
      * Create the id
      */
     public static Id createId( String type ) {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/029c0650/stack/corepersistence/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/pom.xml b/stack/corepersistence/pom.xml
index 2beddd5..9b7b1e1 100644
--- a/stack/corepersistence/pom.xml
+++ b/stack/corepersistence/pom.xml
@@ -75,7 +75,7 @@
                     <systemPropertyVariables>
                         <archaius.deployment.environment>UNIT</archaius.deployment.environment>
                     </systemPropertyVariables>
-                    <argLine>-Xms2G -Xmx2G</argLine>
+                    <argLine>-Xms2G -Xmx4G</argLine>
                 </configuration>
             </plugin>