You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/03/25 16:21:11 UTC

[12/43] git commit: Passing tests for meta data cleanup.

Passing tests for meta data cleanup.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/78dc432d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/78dc432d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/78dc432d

Branch: refs/heads/two-dot-o
Commit: 78dc432d1ed80ca96776657ced8961fee34265d5
Parents: 4dd96aa
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Mar 12 12:57:08 2014 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Mar 12 12:57:08 2014 -0700

----------------------------------------------------------------------
 .../graph/impl/stage/EdgeAsyncImpl.java         | 180 ++++++++++---------
 .../graph/impl/stage/EdgeAsyncTest.java         | 165 +++++++++++++++--
 2 files changed, 242 insertions(+), 103 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/78dc432d/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncImpl.java
index cb2cf6d..ba21bf0 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncImpl.java
@@ -38,7 +38,6 @@ import org.apache.usergrid.persistence.graph.impl.SimpleSearchIdType;
 import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
 import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
 import org.apache.usergrid.persistence.graph.serialization.impl.parse.ObservableIterator;
-import org.apache.usergrid.persistence.graph.serialization.util.EdgeUtils;
 import org.apache.usergrid.persistence.model.entity.Id;
 
 import com.google.common.base.Preconditions;
@@ -83,7 +82,7 @@ public class EdgeAsyncImpl implements EdgeAsync {
 
     @Override
     public Observable<Integer> cleanSources( final OrganizationScope scope, final Id sourceId, final String edgeType,
-                                       final UUID version ) {
+                                             final UUID version ) {
 
 
         return null;  //To change body of implemented methods use File | Settings | File Templates.
@@ -100,107 +99,114 @@ public class EdgeAsyncImpl implements EdgeAsync {
         Preconditions.checkNotNull( version, "version is required" );
 
 
+        Observable<Integer> deleteCounts =
+                loadEdgeIdsToTarget( scope, new SimpleSearchIdType( targetId, edgeType, null ) )
+                        .buffer( graphFig.getRepairConcurrentSize() )
+                                //buffer them into concurrent groups based on the concurrent repair size
+                        .flatMap( new Func1<List<String>, Observable<Integer>>() {
 
-        return loadEdgeIdsToTarget( scope, new SimpleSearchIdType( targetId, edgeType, null ) )
-                .buffer( graphFig.getRepairConcurrentSize() )
-                        //buffer them into concurrent groups based on the concurrent repair size
-                .flatMap( new Func1<List<String>, Observable<Integer>>() {
-
-                    @Override
-                    public Observable<Integer> call( final List<String> types ) {
+                            @Override
+                            public Observable<Integer> call( final List<String> types ) {
 
 
-                        final MutationBatch batch = keyspace.prepareMutationBatch();
+                                final MutationBatch batch = keyspace.prepareMutationBatch();
 
-                        final List<Observable<Integer>> checks = new ArrayList<Observable<Integer>>( types.size() );
+                                final List<Observable<Integer>> checks =
+                                        new ArrayList<Observable<Integer>>( types.size() );
 
-                        //for each id type, check if the exist in parallel to increase processing speed
-                        for ( final String sourceIdType : types ) {
+                                //for each id type, check if the exist in parallel to increase processing speed
+                                for ( final String sourceIdType : types ) {
 
-                            final SearchByIdType searchData =   new SimpleSearchByIdType( targetId, edgeType, version, sourceIdType, null );
+                                    final SearchByIdType searchData =
+                                            new SimpleSearchByIdType( targetId, edgeType, version, sourceIdType, null );
 
-                            Observable<Integer> search = getEdgesToTargetBySourceType( scope, searchData
-                                    )
-                                    .distinctUntilChanged( new Func1<MarkedEdge, Id>() {
+                                    Observable<Integer> search = getEdgesToTargetBySourceType( scope, searchData )
+                                            .distinctUntilChanged( new Func1<MarkedEdge, Id>() {
 
-                                        //get distinct by source node
-                                        @Override
-                                        public Id call( final MarkedEdge markedEdge ) {
-                                            return markedEdge.getSourceNode();
-                                        }
-                                    } ).take( 1 ).count().doOnNext( new Action1<Integer>() {
+                                                //get distinct by source node
+                                                @Override
+                                                public Id call( final MarkedEdge markedEdge ) {
+                                                    return markedEdge.getSourceNode();
+                                                }
+                                            } ).take( 1 ).count().doOnNext( new Action1<Integer>() {
 
-                                        @Override
-                                        public void call( final Integer count ) {
-                                            /**
-                                             * we only want to delete if no edges are in this class. If there are
-                                             * still edges
-                                             * we must retain the information in order to keep our index structure
-                                             * correct for edge
-                                             * iteration
-                                             **/
-                                            if ( count != 0 ) {
-                                                return;
-                                            }
+                                                @Override
+                                                public void call( final Integer count ) {
+                                                    /**
+                                                     * we only want to delete if no edges are in this class. If there
+                                                     * are
+                                                     * still edges
+                                                     * we must retain the information in order to keep our index
+                                                     * structure
+                                                     * correct for edge
+                                                     * iteration
+                                                     **/
+                                                    if ( count != 0 ) {
+                                                        return;
+                                                    }
 
 
-                                            batch.mergeShallow( edgeMetadataSerialization
-                                                    .removeIdTypeToTarget( scope, targetId, edgeType, sourceIdType,
-                                                            version ) );
-                                        }
-                                    } );
+                                                    batch.mergeShallow( edgeMetadataSerialization
+                                                            .removeIdTypeToTarget( scope, targetId, edgeType,
+                                                                    sourceIdType, version ) );
+                                                }
+                                            } );
 
-                            checks.add( search );
-                        }
+                                    checks.add( search );
+                                }
 
 
-                        /**
-                         * Sum up the total number of edges we had, then execute the mutation if we have anything to do
-                         */
-                        return MathObservable.sumInteger(Observable.merge( checks )).doOnNext( new Action1<Integer>() {
-                            @Override
-                            public void call( final Integer count ) {
+                                /**
+                                 * Sum up the total number of edges we had, then execute the mutation if we have
+                                 * anything to do
+                                 */
+                                return MathObservable.sumInteger( Observable.merge( checks ) )
+                                                     .doOnNext( new Action1<Integer>() {
+                                                         @Override
+                                                         public void call( final Integer count ) {
+
+                                                             if ( batch.isEmpty() ) {
+                                                                 return;
+                                                             }
+
+                                                             try {
+                                                                 batch.execute();
+                                                             }
+                                                             catch ( ConnectionException e ) {
+                                                                 throw new RuntimeException(
+                                                                         "Unable to execute mutation", e );
+                                                             }
+                                                         }
+                                                     } );
+                            }
+                        } )
+                                //if we get no edges, emit a 0 so the caller knows nothing was deleted
+                        .defaultIfEmpty( 0 );
 
-                                if(batch.isEmpty()){
-                                    return;
-                                }
 
-                                try {
-                                    batch.execute();
-                                }
-                                catch ( ConnectionException e ) {
-                                    throw new RuntimeException( "Unable to execute mutation", e );
-                                }
-                            }
-                        } );
-
-                    }
-
-                } )
-                .map( new Func1<Integer, Integer>() {
-                    @Override
-                    public Integer call( final Integer subTypes ) {
-
-                        /**
-                         * We can only execute deleting this type if no sub types were deleted
-                         */
-                        if(subTypes != 0){
-                            return subTypes;
-                        }
-
-                        try {
-                            edgeMetadataSerialization.removeEdgeTypeToTarget( scope, targetId, edgeType, version )
-                                                         .execute();
-                        }
-                        catch ( ConnectionException e ) {
-                            throw new RuntimeException( "Unable to execute mutation" );
-                        }
-
-                        return subTypes;
-                    }
-                } )
-                //if we get no edges, emit a 0 so the caller knows nothing was deleted
-                .defaultIfEmpty( 0 );
+        //sum up everything emitted by sub types.  If there's no edges existing for all sub types,
+        // then we can safely remove them
+        return MathObservable.sumInteger( deleteCounts ).map( new Func1<Integer, Integer>() {
+            @Override
+            public Integer call( final Integer subTypes ) {
+
+                /**
+                 * We can only execute deleting this type if no sub types were deleted
+                 */
+                if ( subTypes != 0 ) {
+                    return subTypes;
+                }
+
+                try {
+                    edgeMetadataSerialization.removeEdgeTypeToTarget( scope, targetId, edgeType, version ).execute();
+                }
+                catch ( ConnectionException e ) {
+                    throw new RuntimeException( "Unable to execute mutation" );
+                }
+
+                return subTypes;
+            }
+        } );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/78dc432d/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncTest.java
index c17676b..5f57e64 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncTest.java
@@ -20,7 +20,9 @@
 package org.apache.usergrid.persistence.graph.impl.stage;
 
 
+import java.util.HashSet;
 import java.util.Iterator;
+import java.util.Set;
 import java.util.UUID;
 
 import org.jukito.JukitoRunner;
@@ -35,6 +37,7 @@ import org.apache.usergrid.persistence.collection.OrganizationScope;
 import org.apache.usergrid.persistence.collection.cassandra.CassandraRule;
 import org.apache.usergrid.persistence.collection.guice.MigrationManagerRule;
 import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphFig;
 import org.apache.usergrid.persistence.graph.guice.TestGraphModule;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchIdType;
@@ -59,8 +62,8 @@ import static org.mockito.Mockito.when;
  *
  *
  */
-@RunWith(JukitoRunner.class)
-@UseModules({ TestGraphModule.class })
+@RunWith( JukitoRunner.class )
+@UseModules( { TestGraphModule.class } )
 public class EdgeAsyncTest {
 
 
@@ -85,6 +88,9 @@ public class EdgeAsyncTest {
     @Inject
     protected EdgeMetadataSerialization edgeMetadataSerialization;
 
+    @Inject
+    protected GraphFig graphFig;
+
     protected OrganizationScope scope;
 
 
@@ -102,53 +108,180 @@ public class EdgeAsyncTest {
 
 
     @Test
-    public void cleanTargetNoEdgesNoMeta(){
-       //do no writes, then execute a cleanup with no meta data
+    public void cleanTargetNoEdgesNoMeta() {
+        //do no writes, then execute a cleanup with no meta data
 
-        final Id targetId = createId ("target" );
+        final Id targetId = createId( "target" );
         final String test = "test";
         final UUID version = UUIDGenerator.newTimeUUID();
 
         int value = edgeAsync.clearTargets( scope, targetId, test, version ).toBlockingObservable().single();
 
-        assertEquals("No subtypes found", 0, value);
+        assertEquals( "No subtypes found", 0, value );
     }
 
+
     @Test
-    public void cleanTargetSingleEge() throws ConnectionException {
+    public void cleanTargetSingleEdge() throws ConnectionException {
         Edge edge = createEdge( "source", "test", "target" );
 
         edgeSerialization.writeEdge( scope, edge ).execute();
 
         edgeMetadataSerialization.writeEdge( scope, edge ).execute();
 
-        int value = edgeAsync.clearTargets( scope, edge.getTargetNode(), edge.getType(), edge.getVersion() ).toBlockingObservable().single();
+        int value = edgeAsync.clearTargets( scope, edge.getTargetNode(), edge.getType(), edge.getVersion() )
+                             .toBlockingObservable().single();
 
-        assertEquals("No subtypes removed, edge exists", 1, value);
+        assertEquals( "No subtypes removed, edge exists", 1, value );
 
         //now delete the edge
 
         edgeSerialization.deleteEdge( scope, edge ).execute();
 
-        value = edgeAsync.clearTargets( scope, edge.getTargetNode(), edge.getType(), edge.getVersion() ).toBlockingObservable().single();
+        value = edgeAsync.clearTargets( scope, edge.getTargetNode(), edge.getType(), edge.getVersion() )
+                         .toBlockingObservable().single();
 
-        assertEquals("Single subtype should be removed", 0, value);
+        assertEquals( "Single subtype should be removed", 0, value );
 
         //now verify they're gone
 
-        Iterator<String> edgeTypes = edgeMetadataSerialization.getEdgeTypesToTarget( scope,
-                new SimpleSearchEdgeType( edge.getTargetNode(), null ) );
+        Iterator<String> edgeTypes = edgeMetadataSerialization
+                .getEdgeTypesToTarget( scope, new SimpleSearchEdgeType( edge.getTargetNode(), null ) );
+
+        assertFalse( "No edge types exist", edgeTypes.hasNext() );
+
+
+        Iterator<String> sourceTypes = edgeMetadataSerialization
+                .getIdTypesToTarget( scope, new SimpleSearchIdType( edge.getTargetNode(), edge.getType(), null ) );
+
+        assertFalse( "No edge types exist", sourceTypes.hasNext() );
+    }
+
+
+    @Test
+    public void cleanTargetMultipleEdge() throws ConnectionException {
+
+        Id targetId = createId( "target" );
+
+        Edge edge1 = createEdge( createId("source1"), "test", targetId);
+
+
+
+        edgeSerialization.writeEdge( scope, edge1 ).execute();
+
+        edgeMetadataSerialization.writeEdge( scope, edge1 ).execute();
+
+        Edge edge2 = createEdge( createId("source2"), "test", targetId );
+
+        edgeSerialization.writeEdge( scope, edge2 ).execute();
+
+        edgeMetadataSerialization.writeEdge( scope, edge2 ).execute();
+
+        Edge edge3 = createEdge( createId("source3"), "test", targetId );
+
+        edgeSerialization.writeEdge( scope, edge3 ).execute();
+
+        edgeMetadataSerialization.writeEdge( scope, edge3 ).execute();
+
+
+        UUID cleanupVersion = UUIDGenerator.newTimeUUID();
+
+        int value = edgeAsync.clearTargets( scope, edge1.getTargetNode(), edge1.getType(), cleanupVersion )
+                             .toBlockingObservable().single();
+
+        assertEquals( "No subtypes removed, edges exist", 3, value );
+
+        //now delete the edge
+
+        edgeSerialization.deleteEdge( scope, edge1 ).execute();
+
+        value = edgeAsync.clearTargets( scope, edge1.getTargetNode(), edge1.getType(), cleanupVersion )
+                         .toBlockingObservable().single();
+
+        assertEquals( "No subtypes removed, edges exist", 2, value );
+
+        edgeSerialization.deleteEdge( scope, edge2 ).execute();
 
-        assertFalse("No edge types exist", edgeTypes.hasNext());
+        value = edgeAsync.clearTargets( scope, edge1.getTargetNode(), edge1.getType(), cleanupVersion )
+                         .toBlockingObservable().single();
 
+        assertEquals( "No subtypes removed, edges exist", 1, value );
 
-        Iterator<String> sourceTypes = edgeMetadataSerialization.getIdTypesToTarget( scope, new SimpleSearchIdType( edge.getTargetNode(), edge.getType(), null ) );
+        edgeSerialization.deleteEdge( scope, edge3 ).execute();
 
-        assertFalse("No edge types exist", sourceTypes.hasNext());
+        value = edgeAsync.clearTargets( scope, edge1.getTargetNode(), edge1.getType(), cleanupVersion )
+                         .toBlockingObservable().single();
 
 
+        assertEquals( "Single subtype should be removed", 0, value );
 
+        //now verify they're gone
+
+        Iterator<String> edgeTypes = edgeMetadataSerialization
+                .getEdgeTypesToTarget( scope, new SimpleSearchEdgeType( edge1.getTargetNode(), null ) );
+
+        assertFalse( "No edge types exist", edgeTypes.hasNext() );
+
+
+        Iterator<String> sourceTypes = edgeMetadataSerialization
+                .getIdTypesToTarget( scope, new SimpleSearchIdType( edge1.getTargetNode(), edge1.getType(), null ) );
+
+        assertFalse( "No edge types exist", sourceTypes.hasNext() );
     }
 
 
+    @Test
+    public void cleanTargetMultipleEdgeBuffer() throws ConnectionException {
+
+        final Id targetId = createId( "target" );
+        final String edgeType = "test";
+
+        final int size =  graphFig.getRepairConcurrentSize()*2;
+
+        Set<Edge> writtenEdges = new HashSet<Edge>();
+
+
+        for(int i = 0; i < size; i ++){
+            Edge edge = createEdge( createId("source"+i), edgeType, targetId);
+
+            edgeSerialization.writeEdge( scope, edge ).execute();
+
+            edgeMetadataSerialization.writeEdge( scope, edge ).execute();
+
+            writtenEdges.add( edge );
+        }
+
+
+        UUID cleanupVersion = UUIDGenerator.newTimeUUID();
+
+        int value = edgeAsync.clearTargets( scope, targetId, edgeType, cleanupVersion )
+                             .toBlockingObservable().single();
+
+        assertEquals( "No subtypes removed, edges exist", size, value );
+
+        //now delete the edge
+
+        for(Edge created: writtenEdges){
+            edgeSerialization.deleteEdge( scope, created ).execute();
+        }
+
+
+        value = edgeAsync.clearTargets( scope, targetId, edgeType, cleanupVersion )
+                                     .toBlockingObservable().single();
+
+        assertEquals( "Subtypes removed", 0, value );
+
+        //now verify they're gone
+
+        Iterator<String> edgeTypes = edgeMetadataSerialization
+                .getEdgeTypesToTarget( scope, new SimpleSearchEdgeType( targetId, null ) );
+
+        assertFalse( "No edge types exist", edgeTypes.hasNext() );
+
+
+        Iterator<String> sourceTypes = edgeMetadataSerialization
+                .getIdTypesToTarget( scope, new SimpleSearchIdType( targetId, edgeType, null ) );
+
+        assertFalse( "No edge types exist", sourceTypes.hasNext() );
+    }
 }