You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/03/25 16:21:11 UTC
[12/43] git commit: Passing tests for meta data cleanup.
Passing tests for meta data cleanup.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/78dc432d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/78dc432d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/78dc432d
Branch: refs/heads/two-dot-o
Commit: 78dc432d1ed80ca96776657ced8961fee34265d5
Parents: 4dd96aa
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Mar 12 12:57:08 2014 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Mar 12 12:57:08 2014 -0700
----------------------------------------------------------------------
.../graph/impl/stage/EdgeAsyncImpl.java | 180 ++++++++++---------
.../graph/impl/stage/EdgeAsyncTest.java | 165 +++++++++++++++--
2 files changed, 242 insertions(+), 103 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/78dc432d/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncImpl.java
index cb2cf6d..ba21bf0 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncImpl.java
@@ -38,7 +38,6 @@ import org.apache.usergrid.persistence.graph.impl.SimpleSearchIdType;
import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
import org.apache.usergrid.persistence.graph.serialization.impl.parse.ObservableIterator;
-import org.apache.usergrid.persistence.graph.serialization.util.EdgeUtils;
import org.apache.usergrid.persistence.model.entity.Id;
import com.google.common.base.Preconditions;
@@ -83,7 +82,7 @@ public class EdgeAsyncImpl implements EdgeAsync {
@Override
public Observable<Integer> cleanSources( final OrganizationScope scope, final Id sourceId, final String edgeType,
- final UUID version ) {
+ final UUID version ) {
return null; //To change body of implemented methods use File | Settings | File Templates.
@@ -100,107 +99,114 @@ public class EdgeAsyncImpl implements EdgeAsync {
Preconditions.checkNotNull( version, "version is required" );
+ Observable<Integer> deleteCounts =
+ loadEdgeIdsToTarget( scope, new SimpleSearchIdType( targetId, edgeType, null ) )
+ .buffer( graphFig.getRepairConcurrentSize() )
+ //buffer them into concurrent groups based on the concurrent repair size
+ .flatMap( new Func1<List<String>, Observable<Integer>>() {
- return loadEdgeIdsToTarget( scope, new SimpleSearchIdType( targetId, edgeType, null ) )
- .buffer( graphFig.getRepairConcurrentSize() )
- //buffer them into concurrent groups based on the concurrent repair size
- .flatMap( new Func1<List<String>, Observable<Integer>>() {
-
- @Override
- public Observable<Integer> call( final List<String> types ) {
+ @Override
+ public Observable<Integer> call( final List<String> types ) {
- final MutationBatch batch = keyspace.prepareMutationBatch();
+ final MutationBatch batch = keyspace.prepareMutationBatch();
- final List<Observable<Integer>> checks = new ArrayList<Observable<Integer>>( types.size() );
+ final List<Observable<Integer>> checks =
+ new ArrayList<Observable<Integer>>( types.size() );
- //for each id type, check if the exist in parallel to increase processing speed
- for ( final String sourceIdType : types ) {
+ //for each id type, check if the exist in parallel to increase processing speed
+ for ( final String sourceIdType : types ) {
- final SearchByIdType searchData = new SimpleSearchByIdType( targetId, edgeType, version, sourceIdType, null );
+ final SearchByIdType searchData =
+ new SimpleSearchByIdType( targetId, edgeType, version, sourceIdType, null );
- Observable<Integer> search = getEdgesToTargetBySourceType( scope, searchData
- )
- .distinctUntilChanged( new Func1<MarkedEdge, Id>() {
+ Observable<Integer> search = getEdgesToTargetBySourceType( scope, searchData )
+ .distinctUntilChanged( new Func1<MarkedEdge, Id>() {
- //get distinct by source node
- @Override
- public Id call( final MarkedEdge markedEdge ) {
- return markedEdge.getSourceNode();
- }
- } ).take( 1 ).count().doOnNext( new Action1<Integer>() {
+ //get distinct by source node
+ @Override
+ public Id call( final MarkedEdge markedEdge ) {
+ return markedEdge.getSourceNode();
+ }
+ } ).take( 1 ).count().doOnNext( new Action1<Integer>() {
- @Override
- public void call( final Integer count ) {
- /**
- * we only want to delete if no edges are in this class. If there are
- * still edges
- * we must retain the information in order to keep our index structure
- * correct for edge
- * iteration
- **/
- if ( count != 0 ) {
- return;
- }
+ @Override
+ public void call( final Integer count ) {
+ /**
+ * we only want to delete if no edges are in this class. If there
+ * are
+ * still edges
+ * we must retain the information in order to keep our index
+ * structure
+ * correct for edge
+ * iteration
+ **/
+ if ( count != 0 ) {
+ return;
+ }
- batch.mergeShallow( edgeMetadataSerialization
- .removeIdTypeToTarget( scope, targetId, edgeType, sourceIdType,
- version ) );
- }
- } );
+ batch.mergeShallow( edgeMetadataSerialization
+ .removeIdTypeToTarget( scope, targetId, edgeType,
+ sourceIdType, version ) );
+ }
+ } );
- checks.add( search );
- }
+ checks.add( search );
+ }
- /**
- * Sum up the total number of edges we had, then execute the mutation if we have anything to do
- */
- return MathObservable.sumInteger(Observable.merge( checks )).doOnNext( new Action1<Integer>() {
- @Override
- public void call( final Integer count ) {
+ /**
+ * Sum up the total number of edges we had, then execute the mutation if we have
+ * anything to do
+ */
+ return MathObservable.sumInteger( Observable.merge( checks ) )
+ .doOnNext( new Action1<Integer>() {
+ @Override
+ public void call( final Integer count ) {
+
+ if ( batch.isEmpty() ) {
+ return;
+ }
+
+ try {
+ batch.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException(
+ "Unable to execute mutation", e );
+ }
+ }
+ } );
+ }
+ } )
+ //if we get no edges, emit a 0 so the caller knows nothing was deleted
+ .defaultIfEmpty( 0 );
- if(batch.isEmpty()){
- return;
- }
- try {
- batch.execute();
- }
- catch ( ConnectionException e ) {
- throw new RuntimeException( "Unable to execute mutation", e );
- }
- }
- } );
-
- }
-
- } )
- .map( new Func1<Integer, Integer>() {
- @Override
- public Integer call( final Integer subTypes ) {
-
- /**
- * We can only execute deleting this type if no sub types were deleted
- */
- if(subTypes != 0){
- return subTypes;
- }
-
- try {
- edgeMetadataSerialization.removeEdgeTypeToTarget( scope, targetId, edgeType, version )
- .execute();
- }
- catch ( ConnectionException e ) {
- throw new RuntimeException( "Unable to execute mutation" );
- }
-
- return subTypes;
- }
- } )
- //if we get no edges, emit a 0 so the caller knows nothing was deleted
- .defaultIfEmpty( 0 );
+ //sum up everything emitted by sub types. If there's no edges existing for all sub types,
+ // then we can safely remove them
+ return MathObservable.sumInteger( deleteCounts ).map( new Func1<Integer, Integer>() {
+ @Override
+ public Integer call( final Integer subTypes ) {
+
+ /**
+ * We can only execute deleting this type if no sub types were deleted
+ */
+ if ( subTypes != 0 ) {
+ return subTypes;
+ }
+
+ try {
+ edgeMetadataSerialization.removeEdgeTypeToTarget( scope, targetId, edgeType, version ).execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to execute mutation" );
+ }
+
+ return subTypes;
+ }
+ } );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/78dc432d/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncTest.java
index c17676b..5f57e64 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeAsyncTest.java
@@ -20,7 +20,9 @@
package org.apache.usergrid.persistence.graph.impl.stage;
+import java.util.HashSet;
import java.util.Iterator;
+import java.util.Set;
import java.util.UUID;
import org.jukito.JukitoRunner;
@@ -35,6 +37,7 @@ import org.apache.usergrid.persistence.collection.OrganizationScope;
import org.apache.usergrid.persistence.collection.cassandra.CassandraRule;
import org.apache.usergrid.persistence.collection.guice.MigrationManagerRule;
import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphFig;
import org.apache.usergrid.persistence.graph.guice.TestGraphModule;
import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
import org.apache.usergrid.persistence.graph.impl.SimpleSearchIdType;
@@ -59,8 +62,8 @@ import static org.mockito.Mockito.when;
*
*
*/
-@RunWith(JukitoRunner.class)
-@UseModules({ TestGraphModule.class })
+@RunWith( JukitoRunner.class )
+@UseModules( { TestGraphModule.class } )
public class EdgeAsyncTest {
@@ -85,6 +88,9 @@ public class EdgeAsyncTest {
@Inject
protected EdgeMetadataSerialization edgeMetadataSerialization;
+ @Inject
+ protected GraphFig graphFig;
+
protected OrganizationScope scope;
@@ -102,53 +108,180 @@ public class EdgeAsyncTest {
@Test
- public void cleanTargetNoEdgesNoMeta(){
- //do no writes, then execute a cleanup with no meta data
+ public void cleanTargetNoEdgesNoMeta() {
+ //do no writes, then execute a cleanup with no meta data
- final Id targetId = createId ("target" );
+ final Id targetId = createId( "target" );
final String test = "test";
final UUID version = UUIDGenerator.newTimeUUID();
int value = edgeAsync.clearTargets( scope, targetId, test, version ).toBlockingObservable().single();
- assertEquals("No subtypes found", 0, value);
+ assertEquals( "No subtypes found", 0, value );
}
+
@Test
- public void cleanTargetSingleEge() throws ConnectionException {
+ public void cleanTargetSingleEdge() throws ConnectionException {
Edge edge = createEdge( "source", "test", "target" );
edgeSerialization.writeEdge( scope, edge ).execute();
edgeMetadataSerialization.writeEdge( scope, edge ).execute();
- int value = edgeAsync.clearTargets( scope, edge.getTargetNode(), edge.getType(), edge.getVersion() ).toBlockingObservable().single();
+ int value = edgeAsync.clearTargets( scope, edge.getTargetNode(), edge.getType(), edge.getVersion() )
+ .toBlockingObservable().single();
- assertEquals("No subtypes removed, edge exists", 1, value);
+ assertEquals( "No subtypes removed, edge exists", 1, value );
//now delete the edge
edgeSerialization.deleteEdge( scope, edge ).execute();
- value = edgeAsync.clearTargets( scope, edge.getTargetNode(), edge.getType(), edge.getVersion() ).toBlockingObservable().single();
+ value = edgeAsync.clearTargets( scope, edge.getTargetNode(), edge.getType(), edge.getVersion() )
+ .toBlockingObservable().single();
- assertEquals("Single subtype should be removed", 0, value);
+ assertEquals( "Single subtype should be removed", 0, value );
//now verify they're gone
- Iterator<String> edgeTypes = edgeMetadataSerialization.getEdgeTypesToTarget( scope,
- new SimpleSearchEdgeType( edge.getTargetNode(), null ) );
+ Iterator<String> edgeTypes = edgeMetadataSerialization
+ .getEdgeTypesToTarget( scope, new SimpleSearchEdgeType( edge.getTargetNode(), null ) );
+
+ assertFalse( "No edge types exist", edgeTypes.hasNext() );
+
+
+ Iterator<String> sourceTypes = edgeMetadataSerialization
+ .getIdTypesToTarget( scope, new SimpleSearchIdType( edge.getTargetNode(), edge.getType(), null ) );
+
+ assertFalse( "No edge types exist", sourceTypes.hasNext() );
+ }
+
+
+ @Test
+ public void cleanTargetMultipleEdge() throws ConnectionException {
+
+ Id targetId = createId( "target" );
+
+ Edge edge1 = createEdge( createId("source1"), "test", targetId);
+
+
+
+ edgeSerialization.writeEdge( scope, edge1 ).execute();
+
+ edgeMetadataSerialization.writeEdge( scope, edge1 ).execute();
+
+ Edge edge2 = createEdge( createId("source2"), "test", targetId );
+
+ edgeSerialization.writeEdge( scope, edge2 ).execute();
+
+ edgeMetadataSerialization.writeEdge( scope, edge2 ).execute();
+
+ Edge edge3 = createEdge( createId("source3"), "test", targetId );
+
+ edgeSerialization.writeEdge( scope, edge3 ).execute();
+
+ edgeMetadataSerialization.writeEdge( scope, edge3 ).execute();
+
+
+ UUID cleanupVersion = UUIDGenerator.newTimeUUID();
+
+ int value = edgeAsync.clearTargets( scope, edge1.getTargetNode(), edge1.getType(), cleanupVersion )
+ .toBlockingObservable().single();
+
+ assertEquals( "No subtypes removed, edges exist", 3, value );
+
+ //now delete the edge
+
+ edgeSerialization.deleteEdge( scope, edge1 ).execute();
+
+ value = edgeAsync.clearTargets( scope, edge1.getTargetNode(), edge1.getType(), cleanupVersion )
+ .toBlockingObservable().single();
+
+ assertEquals( "No subtypes removed, edges exist", 2, value );
+
+ edgeSerialization.deleteEdge( scope, edge2 ).execute();
- assertFalse("No edge types exist", edgeTypes.hasNext());
+ value = edgeAsync.clearTargets( scope, edge1.getTargetNode(), edge1.getType(), cleanupVersion )
+ .toBlockingObservable().single();
+ assertEquals( "No subtypes removed, edges exist", 1, value );
- Iterator<String> sourceTypes = edgeMetadataSerialization.getIdTypesToTarget( scope, new SimpleSearchIdType( edge.getTargetNode(), edge.getType(), null ) );
+ edgeSerialization.deleteEdge( scope, edge3 ).execute();
- assertFalse("No edge types exist", sourceTypes.hasNext());
+ value = edgeAsync.clearTargets( scope, edge1.getTargetNode(), edge1.getType(), cleanupVersion )
+ .toBlockingObservable().single();
+ assertEquals( "Single subtype should be removed", 0, value );
+ //now verify they're gone
+
+ Iterator<String> edgeTypes = edgeMetadataSerialization
+ .getEdgeTypesToTarget( scope, new SimpleSearchEdgeType( edge1.getTargetNode(), null ) );
+
+ assertFalse( "No edge types exist", edgeTypes.hasNext() );
+
+
+ Iterator<String> sourceTypes = edgeMetadataSerialization
+ .getIdTypesToTarget( scope, new SimpleSearchIdType( edge1.getTargetNode(), edge1.getType(), null ) );
+
+ assertFalse( "No edge types exist", sourceTypes.hasNext() );
}
+ @Test
+ public void cleanTargetMultipleEdgeBuffer() throws ConnectionException {
+
+ final Id targetId = createId( "target" );
+ final String edgeType = "test";
+
+ final int size = graphFig.getRepairConcurrentSize()*2;
+
+ Set<Edge> writtenEdges = new HashSet<Edge>();
+
+
+ for(int i = 0; i < size; i ++){
+ Edge edge = createEdge( createId("source"+i), edgeType, targetId);
+
+ edgeSerialization.writeEdge( scope, edge ).execute();
+
+ edgeMetadataSerialization.writeEdge( scope, edge ).execute();
+
+ writtenEdges.add( edge );
+ }
+
+
+ UUID cleanupVersion = UUIDGenerator.newTimeUUID();
+
+ int value = edgeAsync.clearTargets( scope, targetId, edgeType, cleanupVersion )
+ .toBlockingObservable().single();
+
+ assertEquals( "No subtypes removed, edges exist", size, value );
+
+ //now delete the edge
+
+ for(Edge created: writtenEdges){
+ edgeSerialization.deleteEdge( scope, created ).execute();
+ }
+
+
+ value = edgeAsync.clearTargets( scope, targetId, edgeType, cleanupVersion )
+ .toBlockingObservable().single();
+
+ assertEquals( "Subtypes removed", 0, value );
+
+ //now verify they're gone
+
+ Iterator<String> edgeTypes = edgeMetadataSerialization
+ .getEdgeTypesToTarget( scope, new SimpleSearchEdgeType( targetId, null ) );
+
+ assertFalse( "No edge types exist", edgeTypes.hasNext() );
+
+
+ Iterator<String> sourceTypes = edgeMetadataSerialization
+ .getIdTypesToTarget( scope, new SimpleSearchIdType( targetId, edgeType, null ) );
+
+ assertFalse( "No edge types exist", sourceTypes.hasNext() );
+ }
}