You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/03/25 16:21:17 UTC

[18/43] git commit: Fixed tests and listeners.

Fixed tests and listeners.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/6db41cd8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/6db41cd8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/6db41cd8

Branch: refs/heads/two-dot-o
Commit: 6db41cd8eaedd3aba60e2f176ce5045d4407c8c9
Parents: 79fa0cb
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Mar 18 17:28:47 2014 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Mar 18 17:31:20 2014 -0700

----------------------------------------------------------------------
 .../graph/impl/NodeDeleteListener.java          |  9 ++-
 .../graph/impl/stage/AbstractEdgeRepair.java    | 27 ++++++---
 .../graph/impl/stage/EdgeDeleteRepair.java      | 10 ++-
 .../graph/impl/stage/EdgeWriteRepair.java       |  7 ++-
 .../graph/impl/stage/EdgeDeleteRepairTest.java  | 64 ++++++++++++++------
 .../graph/impl/stage/EdgeWriteRepairTest.java   | 61 ++++++++++++++-----
 6 files changed, 131 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6db41cd8/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
index c860fcb..9fcd3a9 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
@@ -149,12 +149,13 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, Intege
 
 
                         //each time an edge is emitted, delete it via batch mutation since we'll already be buffered
-                        return Observable.merge( targetEdges, sourceEdges );
+                        return Observable.concat( targetEdges, sourceEdges );
                     }
                 } ).flatMap( new Func1<MarkedEdge, Observable<MarkedEdge>>() {
                     @Override
                     public Observable<MarkedEdge> call( final MarkedEdge edge ) {
 
+                        //delete the newest edge <= the version on the node delete
                         LOG.debug( "Deleting edge {}", edge );
                         return edgeDeleteRepair.repair( scope, edge );
 
@@ -165,7 +166,9 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, Intege
                     public Observable<MarkedEdge> call( final MarkedEdge edge ) {
 
 
-                        //delete both the source and target meta data in parallel.
+
+                        //delete both the source and target meta data in parallel for the edge we deleted in the previous step
+                        //if nothing else is using them
                         Observable<Integer> sourceMetaRepaired =
                                 edgeMetaRepair.repairSources( scope, edge.getSourceNode(), edge.getType(), version );
 
@@ -173,7 +176,7 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, Intege
                                 edge.getTargetNode(), edge.getType(), version );
 
                         //sum up the number of subtypes we retain
-                        return Observable.merge(sourceMetaRepaired, targetMetaRepaired ).last().map( new Func1
+                        return Observable.concat(sourceMetaRepaired, targetMetaRepaired ).last().map( new Func1
                                 <Integer, MarkedEdge>() {
                             @Override
                             public MarkedEdge call( final Integer integer ) {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6db41cd8/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/AbstractEdgeRepair.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/AbstractEdgeRepair.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/AbstractEdgeRepair.java
index 4b06760..28d0622 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/AbstractEdgeRepair.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/AbstractEdgeRepair.java
@@ -20,8 +20,10 @@
 package org.apache.usergrid.persistence.graph.impl.stage;
 
 
+import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
+import java.util.PriorityQueue;
 import java.util.UUID;
 
 import org.slf4j.Logger;
@@ -34,8 +36,10 @@ import org.apache.usergrid.persistence.graph.MarkedEdge;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
 import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
 import org.apache.usergrid.persistence.graph.serialization.impl.parse.ObservableIterator;
+import org.apache.usergrid.persistence.model.entity.Id;
 
 import com.fasterxml.uuid.UUIDComparator;
+import com.fasterxml.uuid.impl.UUIDUtil;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.netflix.astyanax.Keyspace;
@@ -45,6 +49,7 @@ import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 import rx.Observable;
 import rx.Scheduler;
 import rx.functions.Func1;
+import rx.functions.Func2;
 
 
 /**
@@ -85,8 +90,9 @@ public abstract class AbstractEdgeRepair  {
         Observable<MarkedEdge> targetEdges = getEdgeVersionsToTarget( scope, edge );
 
 
+
         //merge source and target then deal with the distinct values
-        return Observable.merge( sourceEdges, targetEdges ).filter(getFilter(maxVersion)).distinctUntilChanged().buffer( graphFig.getScanPageSize() )
+        return Observable.merge( sourceEdges, targetEdges ).filter( getFilter( maxVersion ) ).distinctUntilChanged().buffer( graphFig.getScanPageSize() )
                          .flatMap( new Func1<List<MarkedEdge>, Observable<MarkedEdge>>() {
                              @Override
                              public Observable<MarkedEdge> call( final List<MarkedEdge> markedEdges ) {
@@ -129,9 +135,7 @@ public abstract class AbstractEdgeRepair  {
             @Override
             protected Iterator<MarkedEdge> getIterator() {
 
-                final SimpleSearchByEdge search =
-                        new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(),
-                                edge.getVersion(), null );
+                final SimpleSearchByEdge search = getSearchByEdge(edge);
 
                 return edgeSerialization.getEdgeFromSource( scope, search );
             }
@@ -148,12 +152,21 @@ public abstract class AbstractEdgeRepair  {
             @Override
             protected Iterator<MarkedEdge> getIterator() {
 
-                final SimpleSearchByEdge search =
-                        new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(),
-                                edge.getVersion(), null );
+                final SimpleSearchByEdge search = getSearchByEdge(edge);
 
                 return edgeSerialization.getEdgeToTarget( scope, search );
             }
         } ).subscribeOn( scheduler );
     }
+
+
+    /**
+     * Construct the search params for the edge
+     * @param edge
+     * @return
+     */
+    private SimpleSearchByEdge getSearchByEdge(final Edge edge){
+        return new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(),
+                                        edge.getVersion(), null );
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6db41cd8/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepair.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepair.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepair.java
index 487472c..5ac6b8b 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepair.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepair.java
@@ -32,10 +32,14 @@ import rx.Observable;
  */
 public interface EdgeDeleteRepair {
 
+
     /**
-     * Repair this edge.  Remove previous entries including this one
-     * @param scope
-     * @param edge
+     * Repair this edge.  Remove previous entries
+     * @param scope The scope to use
+     * @param edge The last edge to retain.  All versions  <= this edge's version  will be deleted
+     *
+     * @return An observable that emits every version of the edge we delete.  Note that it may emit duplicates
+     * since this is a streaming API.
      */
     public Observable<MarkedEdge> repair( OrganizationScope scope, Edge edge );
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6db41cd8/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepair.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepair.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepair.java
index 550ddf1..92be7a7 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepair.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepair.java
@@ -34,8 +34,11 @@ public interface EdgeWriteRepair {
 
     /**
      * Repair this edge.  Remove previous entries
-     * @param scope
-     * @param edge
+     * @param scope The scope to use
+     * @param edge The last edge to retain.  All versions  < this edge's version  will be deleted
+     *
+     * @return An observable that emits every version of the edge we delete.  Note that it may emit duplicates
+     * since this is a streaming API.
      */
     public Observable<MarkedEdge> repair( OrganizationScope scope, Edge edge );
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6db41cd8/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairTest.java
index e8eedc9..dc8a3ac 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairTest.java
@@ -21,8 +21,10 @@ package org.apache.usergrid.persistence.graph.impl.stage;
 
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Set;
 
 import org.jukito.JukitoRunner;
 import org.jukito.UseModules;
@@ -31,6 +33,8 @@ import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.persistence.collection.OrganizationScope;
 import org.apache.usergrid.persistence.collection.cassandra.CassandraRule;
@@ -43,6 +47,8 @@ import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 
+import com.google.common.collect.HashMultiset;
+import com.google.common.collect.Multiset;
 import com.google.inject.Inject;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
@@ -50,6 +56,7 @@ import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.crea
 import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createId;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -62,6 +69,8 @@ import static org.mockito.Mockito.when;
 @UseModules( { TestGraphModule.class } )
 public class EdgeDeleteRepairTest {
 
+    private static final Logger LOG = LoggerFactory.getLogger( EdgeDeleteRepairTest.class );
+
     @ClassRule
     public static CassandraRule rule = new CassandraRule();
 
@@ -108,13 +117,12 @@ public class EdgeDeleteRepairTest {
 
 
     /**
-     * Test repairing with no edges
-     * TODO: TN.  There appears to be a race condition here with ordering.  Not sure if this is intentional as part of the impl
-     * or if it's an issue
+     * Test repairing with no edges TODO: TN.  There appears to be a race condition here with ordering.  Not sure if
+     * this is intentional as part of the impl or if it's an issue
      */
     @Test
     public void versionTest() throws ConnectionException {
-        final int size = 10;
+        final int size = 3;
 
         final List<Edge> versions = new ArrayList<Edge>( size );
 
@@ -122,6 +130,10 @@ public class EdgeDeleteRepairTest {
         final Id targetId = createId( "target" );
         final String edgeType = "edge";
 
+        Set<Edge> deletedEdges = new HashSet<Edge>();
+        int deleteIndex = size / 2;
+
+
         for ( int i = 0; i < size; i++ ) {
             final Edge edge = createEdge( sourceId, edgeType, targetId );
 
@@ -129,46 +141,62 @@ public class EdgeDeleteRepairTest {
 
             edgeSerialization.writeEdge( scope, edge ).execute();
 
-            System.out.println(String.format("[%d] %s", i, edge));
-        }
+            LOG.info( "Writing edge at index [{}] {}", i, edge );
 
+            if ( i <= deleteIndex ) {
+                deletedEdges.add( edge );
+            }
+        }
 
-        int deleteIndex = size / 2;
 
         Edge keep = versions.get( deleteIndex );
 
         Iterable<MarkedEdge> edges = edgeDeleteRepair.repair( scope, keep ).toBlockingObservable().toIterable();
 
 
-        int index = 0;
+        Multiset<Edge> deletedStream = HashMultiset.create();
 
         for ( MarkedEdge edge : edges ) {
 
-            final Edge removed = versions.get( deleteIndex - index );
+            LOG.info( "Returned edge {} for repair", edge );
 
-            assertEquals( "Removed matches saved index", removed, edge );
 
-            index++;
+            final boolean shouldBeDeleted = deletedEdges.contains( edge );
+
+            assertTrue( "Removed matches saved index", shouldBeDeleted );
+
+            deletedStream.add( edge );
         }
 
+        deletedEdges.removeAll( deletedStream.elementSet() );
+
+        assertEquals( 0, deletedEdges.size() );
+
+
         //now verify we get all the versions we expect back
         Iterator<MarkedEdge> iterator = edgeSerialization.getEdgeFromSource( scope,
                 new SimpleSearchByEdge( sourceId, edgeType, targetId, UUIDGenerator.newTimeUUID(), null ) );
 
-        index = 0;
+        int count = 0;
+
+        for ( MarkedEdge edge : new IterableWrapper<MarkedEdge>( iterator ) ) {
+
+            LOG.info( "Returned edge {} to verify", edge );
+
+            final int index = size - count - 1;
 
-        for(MarkedEdge edge: new IterableWrapper<MarkedEdge>( iterator )){
+            LOG.info( "Checking for correct version at index {}", index );
 
-            final Edge saved = versions.get( size - index - 1);
+            final Edge saved = versions.get( index );
 
-            assertEquals(saved, edge);
+            assertEquals( "Retained edge correct", saved, edge );
 
-            index++;
+            count++;
         }
 
-        final int keptCount = size-deleteIndex;
+        final int keptCount = size - deleteIndex;
 
-        assertEquals("Kept edge version was the minimum", keptCount, index+1);
+        assertEquals( "Kept edge version was the minimum", keptCount, count + 1 );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6db41cd8/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairTest.java
index f796c85..3416421 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairTest.java
@@ -21,8 +21,14 @@ package org.apache.usergrid.persistence.graph.impl.stage;
 
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.jukito.JukitoRunner;
 import org.jukito.UseModules;
@@ -31,6 +37,8 @@ import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.persistence.collection.OrganizationScope;
 import org.apache.usergrid.persistence.collection.cassandra.CassandraRule;
@@ -43,6 +51,8 @@ import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 
+import com.google.common.collect.HashMultiset;
+import com.google.common.collect.Multiset;
 import com.google.inject.Inject;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
@@ -50,6 +60,7 @@ import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.crea
 import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createId;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -62,6 +73,8 @@ import static org.mockito.Mockito.when;
 @UseModules( { TestGraphModule.class } )
 public class EdgeWriteRepairTest {
 
+    private static final Logger LOG = LoggerFactory.getLogger( EdgeWriteRepairTest.class );
+
     @ClassRule
     public static CassandraRule rule = new CassandraRule();
 
@@ -107,6 +120,7 @@ public class EdgeWriteRepairTest {
     }
 
 
+
     /**
      * Test repairing with no edges
      * TODO: TN.  There appears to be a race condition here with ordering.  Not sure if this is intentional as part of the impl
@@ -114,7 +128,7 @@ public class EdgeWriteRepairTest {
      */
     @Test
     public void versionTest() throws ConnectionException {
-        final int size = 10;
+        final int size = 20;
 
         final List<Edge> versions = new ArrayList<Edge>( size );
 
@@ -122,6 +136,10 @@ public class EdgeWriteRepairTest {
         final Id targetId = createId( "target" );
         final String edgeType = "edge";
 
+        int deleteIndex = size / 2;
+
+        Set<Edge> deletedEdges = new HashSet<Edge>();
+
         for ( int i = 0; i < size; i++ ) {
             final Edge edge = createEdge( sourceId, edgeType, targetId );
 
@@ -129,44 +147,56 @@ public class EdgeWriteRepairTest {
 
             edgeSerialization.writeEdge( scope, edge ).execute();
 
-            System.out.println(String.format("[%d] %s", i, edge));
-        }
+            LOG.info( "Writing edge at index [{}] {}", i, edge );
 
+            if(i < deleteIndex){
+                deletedEdges.add( edge );
+            }
 
-        int keepIndex = size / 2;
 
-        Edge keep = versions.get( keepIndex );
+        }
 
-        Iterable<MarkedEdge> edges = edgeWriteRepair.repair( scope, keep ).toBlockingObservable().toIterable();
 
+        Edge keep = versions.get( deleteIndex );
+
+        Iterable<MarkedEdge> edges = edgeWriteRepair.repair( scope, keep ).toBlockingObservable().toIterable();
 
-        int index = 0;
+        Multiset<Edge> deletedStream = HashMultiset.create();
 
         for ( MarkedEdge edge : edges ) {
 
-            final Edge removed = versions.get( keepIndex - index -1 );
+            LOG.info("Returned edge {} for repair", edge);
 
-            assertEquals( "Removed matches saved index", removed, edge );
+           final boolean shouldBeDeleted = deletedEdges.contains( edge );
+
+            assertTrue( "Removed matches saved index", shouldBeDeleted );
+
+            deletedStream.add( edge );
 
-            index++;
         }
 
+        deletedEdges.removeAll( deletedStream.elementSet() );
+
+        assertEquals(0, deletedEdges.size());
+
         //now verify we get all the versions we expect back
         Iterator<MarkedEdge> iterator = edgeSerialization.getEdgeFromSource( scope,
                 new SimpleSearchByEdge( sourceId, edgeType, targetId, UUIDGenerator.newTimeUUID(), null ) );
 
-        index = 0;
+        int count = 0;
 
         for(MarkedEdge edge: new IterableWrapper<MarkedEdge>( iterator )){
 
-            final Edge saved = versions.get( size - index -1 );
+            final Edge saved = versions.get( size - count -1 );
 
             assertEquals(saved, edge);
 
-            index++;
+            count++;
         }
 
-        assertEquals("Kept edge version was the minimum", keepIndex, index);
+        final int keptCount = size-deleteIndex;
+
+        assertEquals("Kept edge version was the minimum", keptCount, count);
     }
 
 
@@ -184,4 +214,7 @@ public class EdgeWriteRepairTest {
             return this.sourceIterator;
         }
     }
+
+
+
 }