You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/03/25 16:21:17 UTC
[18/43] git commit: Fixed tests and listeners.
Fixed tests and listeners.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/6db41cd8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/6db41cd8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/6db41cd8
Branch: refs/heads/two-dot-o
Commit: 6db41cd8eaedd3aba60e2f176ce5045d4407c8c9
Parents: 79fa0cb
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Mar 18 17:28:47 2014 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Mar 18 17:31:20 2014 -0700
----------------------------------------------------------------------
.../graph/impl/NodeDeleteListener.java | 9 ++-
.../graph/impl/stage/AbstractEdgeRepair.java | 27 ++++++---
.../graph/impl/stage/EdgeDeleteRepair.java | 10 ++-
.../graph/impl/stage/EdgeWriteRepair.java | 7 ++-
.../graph/impl/stage/EdgeDeleteRepairTest.java | 64 ++++++++++++++------
.../graph/impl/stage/EdgeWriteRepairTest.java | 61 ++++++++++++++-----
6 files changed, 131 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6db41cd8/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
index c860fcb..9fcd3a9 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
@@ -149,12 +149,13 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, Intege
//each time an edge is emitted, delete it via batch mutation since we'll already be buffered
- return Observable.merge( targetEdges, sourceEdges );
+ return Observable.concat( targetEdges, sourceEdges );
}
} ).flatMap( new Func1<MarkedEdge, Observable<MarkedEdge>>() {
@Override
public Observable<MarkedEdge> call( final MarkedEdge edge ) {
+ //delete the newest edge <= the version on the node delete
LOG.debug( "Deleting edge {}", edge );
return edgeDeleteRepair.repair( scope, edge );
@@ -165,7 +166,9 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, Intege
public Observable<MarkedEdge> call( final MarkedEdge edge ) {
- //delete both the source and target meta data in parallel.
+
+ //delete both the source and target meta data in parallel for the edge we deleted in the previous step
+ //if nothing else is using them
Observable<Integer> sourceMetaRepaired =
edgeMetaRepair.repairSources( scope, edge.getSourceNode(), edge.getType(), version );
@@ -173,7 +176,7 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, Intege
edge.getTargetNode(), edge.getType(), version );
//sum up the number of subtypes we retain
- return Observable.merge(sourceMetaRepaired, targetMetaRepaired ).last().map( new Func1
+ return Observable.concat(sourceMetaRepaired, targetMetaRepaired ).last().map( new Func1
<Integer, MarkedEdge>() {
@Override
public MarkedEdge call( final Integer integer ) {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6db41cd8/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/AbstractEdgeRepair.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/AbstractEdgeRepair.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/AbstractEdgeRepair.java
index 4b06760..28d0622 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/AbstractEdgeRepair.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/AbstractEdgeRepair.java
@@ -20,8 +20,10 @@
package org.apache.usergrid.persistence.graph.impl.stage;
+import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
+import java.util.PriorityQueue;
import java.util.UUID;
import org.slf4j.Logger;
@@ -34,8 +36,10 @@ import org.apache.usergrid.persistence.graph.MarkedEdge;
import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
import org.apache.usergrid.persistence.graph.serialization.impl.parse.ObservableIterator;
+import org.apache.usergrid.persistence.model.entity.Id;
import com.fasterxml.uuid.UUIDComparator;
+import com.fasterxml.uuid.impl.UUIDUtil;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.netflix.astyanax.Keyspace;
@@ -45,6 +49,7 @@ import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import rx.Observable;
import rx.Scheduler;
import rx.functions.Func1;
+import rx.functions.Func2;
/**
@@ -85,8 +90,9 @@ public abstract class AbstractEdgeRepair {
Observable<MarkedEdge> targetEdges = getEdgeVersionsToTarget( scope, edge );
+
//merge source and target then deal with the distinct values
- return Observable.merge( sourceEdges, targetEdges ).filter(getFilter(maxVersion)).distinctUntilChanged().buffer( graphFig.getScanPageSize() )
+ return Observable.merge( sourceEdges, targetEdges ).filter( getFilter( maxVersion ) ).distinctUntilChanged().buffer( graphFig.getScanPageSize() )
.flatMap( new Func1<List<MarkedEdge>, Observable<MarkedEdge>>() {
@Override
public Observable<MarkedEdge> call( final List<MarkedEdge> markedEdges ) {
@@ -129,9 +135,7 @@ public abstract class AbstractEdgeRepair {
@Override
protected Iterator<MarkedEdge> getIterator() {
- final SimpleSearchByEdge search =
- new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(),
- edge.getVersion(), null );
+ final SimpleSearchByEdge search = getSearchByEdge(edge);
return edgeSerialization.getEdgeFromSource( scope, search );
}
@@ -148,12 +152,21 @@ public abstract class AbstractEdgeRepair {
@Override
protected Iterator<MarkedEdge> getIterator() {
- final SimpleSearchByEdge search =
- new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(),
- edge.getVersion(), null );
+ final SimpleSearchByEdge search = getSearchByEdge(edge);
return edgeSerialization.getEdgeToTarget( scope, search );
}
} ).subscribeOn( scheduler );
}
+
+
+ /**
+ * Construct the search params for the edge
+ * @param edge
+ * @return
+ */
+ private SimpleSearchByEdge getSearchByEdge(final Edge edge){
+ return new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(),
+ edge.getVersion(), null );
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6db41cd8/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepair.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepair.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepair.java
index 487472c..5ac6b8b 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepair.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepair.java
@@ -32,10 +32,14 @@ import rx.Observable;
*/
public interface EdgeDeleteRepair {
+
/**
- * Repair this edge. Remove previous entries including this one
- * @param scope
- * @param edge
+ * Repair this edge. Remove previous entries
+ * @param scope The scope to use
+ * @param edge The last edge to retain. All versions <= this edge's version will be deleted
+ *
+ * @return An observable that emits every version of the edge we delete. Note that it may emit duplicates
+ * since this is a streaming API.
*/
public Observable<MarkedEdge> repair( OrganizationScope scope, Edge edge );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6db41cd8/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepair.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepair.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepair.java
index 550ddf1..92be7a7 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepair.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepair.java
@@ -34,8 +34,11 @@ public interface EdgeWriteRepair {
/**
* Repair this edge. Remove previous entries
- * @param scope
- * @param edge
+ * @param scope The scope to use
+ * @param edge The last edge to retain. All versions < this edge's version will be deleted
+ *
+ * @return An observable that emits every version of the edge we delete. Note that it may emit duplicates
+ * since this is a streaming API.
*/
public Observable<MarkedEdge> repair( OrganizationScope scope, Edge edge );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6db41cd8/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairTest.java
index e8eedc9..dc8a3ac 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairTest.java
@@ -21,8 +21,10 @@ package org.apache.usergrid.persistence.graph.impl.stage;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
+import java.util.Set;
import org.jukito.JukitoRunner;
import org.jukito.UseModules;
@@ -31,6 +33,8 @@ import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.usergrid.persistence.collection.OrganizationScope;
import org.apache.usergrid.persistence.collection.cassandra.CassandraRule;
@@ -43,6 +47,8 @@ import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+import com.google.common.collect.HashMultiset;
+import com.google.common.collect.Multiset;
import com.google.inject.Inject;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
@@ -50,6 +56,7 @@ import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.crea
import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createId;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -62,6 +69,8 @@ import static org.mockito.Mockito.when;
@UseModules( { TestGraphModule.class } )
public class EdgeDeleteRepairTest {
+ private static final Logger LOG = LoggerFactory.getLogger( EdgeDeleteRepairTest.class );
+
@ClassRule
public static CassandraRule rule = new CassandraRule();
@@ -108,13 +117,12 @@ public class EdgeDeleteRepairTest {
/**
- * Test repairing with no edges
- * TODO: TN. There appears to be a race condition here with ordering. Not sure if this is intentional as part of the impl
- * or if it's an issue
+ * Test repairing with no edges TODO: TN. There appears to be a race condition here with ordering. Not sure if
+ * this is intentional as part of the impl or if it's an issue
*/
@Test
public void versionTest() throws ConnectionException {
- final int size = 10;
+ final int size = 3;
final List<Edge> versions = new ArrayList<Edge>( size );
@@ -122,6 +130,10 @@ public class EdgeDeleteRepairTest {
final Id targetId = createId( "target" );
final String edgeType = "edge";
+ Set<Edge> deletedEdges = new HashSet<Edge>();
+ int deleteIndex = size / 2;
+
+
for ( int i = 0; i < size; i++ ) {
final Edge edge = createEdge( sourceId, edgeType, targetId );
@@ -129,46 +141,62 @@ public class EdgeDeleteRepairTest {
edgeSerialization.writeEdge( scope, edge ).execute();
- System.out.println(String.format("[%d] %s", i, edge));
- }
+ LOG.info( "Writing edge at index [{}] {}", i, edge );
+ if ( i <= deleteIndex ) {
+ deletedEdges.add( edge );
+ }
+ }
- int deleteIndex = size / 2;
Edge keep = versions.get( deleteIndex );
Iterable<MarkedEdge> edges = edgeDeleteRepair.repair( scope, keep ).toBlockingObservable().toIterable();
- int index = 0;
+ Multiset<Edge> deletedStream = HashMultiset.create();
for ( MarkedEdge edge : edges ) {
- final Edge removed = versions.get( deleteIndex - index );
+ LOG.info( "Returned edge {} for repair", edge );
- assertEquals( "Removed matches saved index", removed, edge );
- index++;
+ final boolean shouldBeDeleted = deletedEdges.contains( edge );
+
+ assertTrue( "Removed matches saved index", shouldBeDeleted );
+
+ deletedStream.add( edge );
}
+ deletedEdges.removeAll( deletedStream.elementSet() );
+
+ assertEquals( 0, deletedEdges.size() );
+
+
//now verify we get all the versions we expect back
Iterator<MarkedEdge> iterator = edgeSerialization.getEdgeFromSource( scope,
new SimpleSearchByEdge( sourceId, edgeType, targetId, UUIDGenerator.newTimeUUID(), null ) );
- index = 0;
+ int count = 0;
+
+ for ( MarkedEdge edge : new IterableWrapper<MarkedEdge>( iterator ) ) {
+
+ LOG.info( "Returned edge {} to verify", edge );
+
+ final int index = size - count - 1;
- for(MarkedEdge edge: new IterableWrapper<MarkedEdge>( iterator )){
+ LOG.info( "Checking for correct version at index {}", index );
- final Edge saved = versions.get( size - index - 1);
+ final Edge saved = versions.get( index );
- assertEquals(saved, edge);
+ assertEquals( "Retained edge correct", saved, edge );
- index++;
+ count++;
}
- final int keptCount = size-deleteIndex;
+ final int keptCount = size - deleteIndex;
- assertEquals("Kept edge version was the minimum", keptCount, index+1);
+ assertEquals( "Kept edge version was the minimum", keptCount, count + 1 );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6db41cd8/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairTest.java
index f796c85..3416421 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairTest.java
@@ -21,8 +21,14 @@ package org.apache.usergrid.persistence.graph.impl.stage;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
import org.jukito.JukitoRunner;
import org.jukito.UseModules;
@@ -31,6 +37,8 @@ import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.usergrid.persistence.collection.OrganizationScope;
import org.apache.usergrid.persistence.collection.cassandra.CassandraRule;
@@ -43,6 +51,8 @@ import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+import com.google.common.collect.HashMultiset;
+import com.google.common.collect.Multiset;
import com.google.inject.Inject;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
@@ -50,6 +60,7 @@ import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.crea
import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createId;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -62,6 +73,8 @@ import static org.mockito.Mockito.when;
@UseModules( { TestGraphModule.class } )
public class EdgeWriteRepairTest {
+ private static final Logger LOG = LoggerFactory.getLogger( EdgeWriteRepairTest.class );
+
@ClassRule
public static CassandraRule rule = new CassandraRule();
@@ -107,6 +120,7 @@ public class EdgeWriteRepairTest {
}
+
/**
* Test repairing with no edges
* TODO: TN. There appears to be a race condition here with ordering. Not sure if this is intentional as part of the impl
@@ -114,7 +128,7 @@ public class EdgeWriteRepairTest {
*/
@Test
public void versionTest() throws ConnectionException {
- final int size = 10;
+ final int size = 20;
final List<Edge> versions = new ArrayList<Edge>( size );
@@ -122,6 +136,10 @@ public class EdgeWriteRepairTest {
final Id targetId = createId( "target" );
final String edgeType = "edge";
+ int deleteIndex = size / 2;
+
+ Set<Edge> deletedEdges = new HashSet<Edge>();
+
for ( int i = 0; i < size; i++ ) {
final Edge edge = createEdge( sourceId, edgeType, targetId );
@@ -129,44 +147,56 @@ public class EdgeWriteRepairTest {
edgeSerialization.writeEdge( scope, edge ).execute();
- System.out.println(String.format("[%d] %s", i, edge));
- }
+ LOG.info( "Writing edge at index [{}] {}", i, edge );
+ if(i < deleteIndex){
+ deletedEdges.add( edge );
+ }
- int keepIndex = size / 2;
- Edge keep = versions.get( keepIndex );
+ }
- Iterable<MarkedEdge> edges = edgeWriteRepair.repair( scope, keep ).toBlockingObservable().toIterable();
+ Edge keep = versions.get( deleteIndex );
+
+ Iterable<MarkedEdge> edges = edgeWriteRepair.repair( scope, keep ).toBlockingObservable().toIterable();
- int index = 0;
+ Multiset<Edge> deletedStream = HashMultiset.create();
for ( MarkedEdge edge : edges ) {
- final Edge removed = versions.get( keepIndex - index -1 );
+ LOG.info("Returned edge {} for repair", edge);
- assertEquals( "Removed matches saved index", removed, edge );
+ final boolean shouldBeDeleted = deletedEdges.contains( edge );
+
+ assertTrue( "Removed matches saved index", shouldBeDeleted );
+
+ deletedStream.add( edge );
- index++;
}
+ deletedEdges.removeAll( deletedStream.elementSet() );
+
+ assertEquals(0, deletedEdges.size());
+
//now verify we get all the versions we expect back
Iterator<MarkedEdge> iterator = edgeSerialization.getEdgeFromSource( scope,
new SimpleSearchByEdge( sourceId, edgeType, targetId, UUIDGenerator.newTimeUUID(), null ) );
- index = 0;
+ int count = 0;
for(MarkedEdge edge: new IterableWrapper<MarkedEdge>( iterator )){
- final Edge saved = versions.get( size - index -1 );
+ final Edge saved = versions.get( size - count -1 );
assertEquals(saved, edge);
- index++;
+ count++;
}
- assertEquals("Kept edge version was the minimum", keepIndex, index);
+ final int keptCount = size-deleteIndex;
+
+ assertEquals("Kept edge version was the minimum", keptCount, count);
}
@@ -184,4 +214,7 @@ public class EdgeWriteRepairTest {
return this.sourceIterator;
}
}
+
+
+
}