You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/05/10 12:58:26 UTC
incubator-usergrid git commit: Migrated filter to a transform
operation
Repository: incubator-usergrid
Updated Branches:
refs/heads/USERGRID-614 99cb70052 -> 59ad6a989
Migrated filter to a transform operation
Updated tests
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/59ad6a98
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/59ad6a98
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/59ad6a98
Branch: refs/heads/USERGRID-614
Commit: 59ad6a9897fcdaf31c048a46426873b42915e4b7
Parents: 99cb700
Author: Todd Nine <tn...@apigee.com>
Authored: Sun May 10 04:58:24 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Sun May 10 04:58:24 2015 -0600
----------------------------------------------------------------------
.../graph/impl/GraphManagerImpl.java | 144 ++--
.../graph/impl/SimpleSearchByIdType.java | 8 +
.../persistence/graph/GraphManagerIT.java | 664 ++++++++++++++++---
.../graph/test/util/EdgeTestUtils.java | 27 +
4 files changed, 672 insertions(+), 171 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/59ad6a98/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
index 06cb5a1..e4ce4fd 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
@@ -51,7 +51,6 @@ import org.apache.usergrid.persistence.graph.serialization.util.GraphValidation;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
-import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
@@ -60,7 +59,6 @@ import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import rx.Observable;
-import rx.functions.Func1;
/**
@@ -293,7 +291,7 @@ public class GraphManagerImpl implements GraphManager {
return storageEdgeSerialization.getEdgeVersions( scope, searchByEdge );
}
} ).buffer( graphFig.getScanPageSize() )
- .flatMap( new EdgeBufferFilter( searchByEdge.getMaxTimestamp(), searchByEdge.filterMarked() ) );
+ .compose( new EdgeBufferFilter( searchByEdge.getMaxTimestamp(), searchByEdge.filterMarked() ) );
return ObservableTimer.time( edges, loadEdgesVersionsTimer );
}
@@ -308,7 +306,7 @@ public class GraphManagerImpl implements GraphManager {
return storageEdgeSerialization.getEdgesFromSource( scope, search );
}
} ).buffer( graphFig.getScanPageSize() )
- .flatMap( new EdgeBufferFilter( search.getMaxTimestamp(), search.filterMarked() ) );
+ .compose( new EdgeBufferFilter( search.getMaxTimestamp(), search.filterMarked() ) );
return ObservableTimer.time( edges, loadEdgesFromSourceTimer );
}
@@ -323,7 +321,7 @@ public class GraphManagerImpl implements GraphManager {
return storageEdgeSerialization.getEdgesToTarget( scope, search );
}
} ).buffer( graphFig.getScanPageSize() )
- .flatMap( new EdgeBufferFilter( search.getMaxTimestamp(), search.filterMarked() ) );
+ .compose( new EdgeBufferFilter( search.getMaxTimestamp(), search.filterMarked() ) );
return ObservableTimer.time( edges, loadEdgesToTargetTimer );
@@ -334,14 +332,14 @@ public class GraphManagerImpl implements GraphManager {
public Observable<Edge> loadEdgesFromSourceByType( final SearchByIdType search ) {
final Observable<Edge> edges =
Observable.create( new ObservableIterator<MarkedEdge>( "getEdgeTypesFromSource" ) {
- @Override
- protected Iterator<MarkedEdge> getIterator() {
- return storageEdgeSerialization.getEdgesFromSourceByTargetType( scope, search );
- }
- } ).buffer( graphFig.getScanPageSize() )
- .flatMap( new EdgeBufferFilter( search.getMaxTimestamp(), search.filterMarked() ) );
+ @Override
+ protected Iterator<MarkedEdge> getIterator() {
+ return storageEdgeSerialization.getEdgesFromSourceByTargetType( scope, search );
+ }
+ } ).buffer( graphFig.getScanPageSize() )
+ .compose( new EdgeBufferFilter( search.getMaxTimestamp(), search.filterMarked() ) );
- return ObservableTimer.time( edges, loadEdgesFromSourceTimer );
+ return ObservableTimer.time( edges, loadEdgesFromSourceByTypeTimer );
}
@@ -349,12 +347,12 @@ public class GraphManagerImpl implements GraphManager {
public Observable<Edge> loadEdgesToTargetByType( final SearchByIdType search ) {
final Observable<Edge> edges =
Observable.create( new ObservableIterator<MarkedEdge>( "getEdgeTypesFromSource" ) {
- @Override
- protected Iterator<MarkedEdge> getIterator() {
- return storageEdgeSerialization.getEdgesToTargetBySourceType( scope, search );
- }
- } ).buffer( graphFig.getScanPageSize() )
- .flatMap( new EdgeBufferFilter( search.getMaxTimestamp(), search.filterMarked() ) );
+ @Override
+ protected Iterator<MarkedEdge> getIterator() {
+ return storageEdgeSerialization.getEdgesToTargetBySourceType( scope, search );
+ }
+ } ).buffer( graphFig.getScanPageSize() )
+ .compose( new EdgeBufferFilter( search.getMaxTimestamp(), search.filterMarked() ) );
return ObservableTimer.time( edges, loadEdgesToTargetByTypeTimer );
}
@@ -362,13 +360,13 @@ public class GraphManagerImpl implements GraphManager {
@Override
public Observable<String> getEdgeTypesFromSource( final SearchEdgeType search ) {
- final Observable<String> edgeTypes = Observable.create(
- new ObservableIterator<String>( "getEdgeTypesFromSource" ) {
- @Override
- protected Iterator<String> getIterator() {
- return edgeMetadataSerialization.getEdgeTypesFromSource( scope, search );
- }
- } );
+ final Observable<String> edgeTypes =
+ Observable.create( new ObservableIterator<String>( "getEdgeTypesFromSource" ) {
+ @Override
+ protected Iterator<String> getIterator() {
+ return edgeMetadataSerialization.getEdgeTypesFromSource( scope, search );
+ }
+ } );
return ObservableTimer.time( edgeTypes, getEdgeTypesFromSourceTimer );
}
@@ -376,12 +374,13 @@ public class GraphManagerImpl implements GraphManager {
@Override
public Observable<String> getIdTypesFromSource( final SearchIdType search ) {
- final Observable<String> edgeTypes = Observable.create( new ObservableIterator<String>( "getIdTypesFromSource" ) {
- @Override
- protected Iterator<String> getIterator() {
- return edgeMetadataSerialization.getIdTypesFromSource( scope, search );
- }
- } );
+ final Observable<String> edgeTypes =
+ Observable.create( new ObservableIterator<String>( "getIdTypesFromSource" ) {
+ @Override
+ protected Iterator<String> getIterator() {
+ return edgeMetadataSerialization.getIdTypesFromSource( scope, search );
+ }
+ } );
return ObservableTimer.time( edgeTypes, getIdTypesFromSourceTimer );
}
@@ -389,22 +388,21 @@ public class GraphManagerImpl implements GraphManager {
@Override
public Observable<String> getEdgeTypesToTarget( final SearchEdgeType search ) {
- final Observable<String> edgeTypes = Observable.create(
- new ObservableIterator<String>( "getEdgeTypesToTarget" ) {
- @Override
- protected Iterator<String> getIterator() {
- return edgeMetadataSerialization.getEdgeTypesToTarget( scope, search );
- }
- } );
+ final Observable<String> edgeTypes =
+ Observable.create( new ObservableIterator<String>( "getEdgeTypesToTarget" ) {
+ @Override
+ protected Iterator<String> getIterator() {
+ return edgeMetadataSerialization.getEdgeTypesToTarget( scope, search );
+ }
+ } );
- return ObservableTimer.time( edgeTypes, getEdgeTypesFromSourceTimer );
+ return ObservableTimer.time( edgeTypes, getEdgeTypesToTargetTimer );
}
@Override
public Observable<String> getIdTypesToTarget( final SearchIdType search ) {
- final Observable<String> edgeTypes = Observable.create(
- new ObservableIterator<String>( "getIdTypesToTarget" ) {
+ final Observable<String> edgeTypes = Observable.create( new ObservableIterator<String>( "getIdTypesToTarget" ) {
@Override
protected Iterator<String> getIterator() {
return edgeMetadataSerialization.getIdTypesToTarget( scope, search );
@@ -418,7 +416,9 @@ public class GraphManagerImpl implements GraphManager {
/**
* Helper filter to perform mapping and return an observable of pre-filtered edges
*/
- private class EdgeBufferFilter implements Func1<List<MarkedEdge>, Observable<MarkedEdge>> {
+ private class EdgeBufferFilter implements
+ Observable.Transformer<List<MarkedEdge>, MarkedEdge> {//implements Func1<List<MarkedEdge>,
+ // Observable<MarkedEdge>> {
private final long maxVersion;
private final boolean filterMarked;
@@ -430,46 +430,60 @@ public class GraphManagerImpl implements GraphManager {
}
+ @Override
+
/**
* Takes a buffered list of marked edges. It then does a single round trip to fetch marked ids These are then
* used in conjunction with the max version filter to filter any edges that should not be returned
*
* @return An observable that emits only edges that can be consumed. There could be multiple versions of the
* same edge so those need de-duped.
- */
- @Override
- public Observable<MarkedEdge> call( final List<MarkedEdge> markedEdges ) {
+ */ public Observable<MarkedEdge> call( final Observable<List<MarkedEdge>> markedEdgesObservable ) {
- final Map<Id, Long> markedVersions = nodeSerialization.getMaxVersions( scope, markedEdges );
- final long maxTimestamp = maxVersion;
+ return markedEdgesObservable.flatMap( markedEdges -> {
- return Observable.from( markedEdges ).filter( edge -> {
- final long edgeTimestamp = edge.getTimestamp();
+ final Observable<MarkedEdge> markedEdgeObservable = Observable.from( markedEdges );
- //our edge needs to not be deleted and have a version that's > max Version
- if (( edge.isDeleted() && filterMarked) || Long.compare( edgeTimestamp, maxTimestamp ) > 0 ) {
- return false;
+ /**
+ * We aren't going to filter anything, return exactly what we're passed
+ */
+ if(!filterMarked){
+ return markedEdgeObservable;
}
+ //We need to filter, perform that filter
+ final Map<Id, Long> markedVersions = nodeSerialization.getMaxVersions( scope, markedEdges );
- final Long sourceTimestamp = markedVersions.get( edge.getSourceNode() );
+ return markedEdgeObservable.filter( edge -> {
+ final long edgeTimestamp = edge.getTimestamp();
- //the source Id has been marked for deletion. It's version is <= to the marked version for deletion,
- // so we need to discard it
- if ( sourceTimestamp != null && Long.compare( edgeTimestamp, sourceTimestamp ) < 1 ) {
- return false;
- }
+ //our edge needs to not be deleted and have a version that's > max Version
+ if ( edge.isDeleted() || Long.compare( edgeTimestamp, maxVersion ) > 0 ) {
+ return false;
+ }
- final Long targetTimestamp = markedVersions.get( edge.getTargetNode() );
- //the target Id has been marked for deletion. It's version is <= to the marked version for deletion,
- // so we need to discard it
- if ( targetTimestamp != null && Long.compare( edgeTimestamp, targetTimestamp ) < 1 ) {
- return false;
- }
+ final Long sourceTimestamp = markedVersions.get( edge.getSourceNode() );
+
+ //the source Id has been marked for deletion. It's version is <= to the marked version for
+ // deletion,
+ // so we need to discard it
+ if ( sourceTimestamp != null && Long.compare( edgeTimestamp, sourceTimestamp ) < 1 ) {
+ return false;
+ }
+
+ final Long targetTimestamp = markedVersions.get( edge.getTargetNode() );
+
+ //the target Id has been marked for deletion. It's version is <= to the marked version for
+ // deletion,
+ // so we need to discard it
+ if ( targetTimestamp != null && Long.compare( edgeTimestamp, targetTimestamp ) < 1 ) {
+ return false;
+ }
- return true;
+ return true;
+ } );
} );
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/59ad6a98/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByIdType.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByIdType.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByIdType.java
index 9d989a3..bcbff0a 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByIdType.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByIdType.java
@@ -54,6 +54,14 @@ public class SimpleSearchByIdType extends SimpleSearchByEdgeType implements Sear
}
+ public SimpleSearchByIdType( final Id node, final String type, final long maxTimestamp, final Order order, final String idType,
+ final Optional<Edge> last, final boolean filterMarked ) {
+ super( node, type, maxTimestamp, order, last, filterMarked );
+ ValidationUtils.verifyString( idType, "idType" );
+ this.idType = idType;
+ }
+
+
@Override
public String getIdType() {
return idType;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/59ad6a98/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java
index 75f61f0..e512608 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java
@@ -43,18 +43,21 @@ import com.google.inject.Inject;
import rx.Observable;
+import static org.apache.usergrid.persistence.core.util.IdGenerator.createId;
import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createEdge;
import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createGetByEdge;
-import static org.apache.usergrid.persistence.core.util.IdGenerator.createId;
import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createSearchByEdge;
import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createSearchByEdgeAndId;
+import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createSearchByEdgeAndIdUnfiltered;
+import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createSearchByEdgeUnfiltered;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-@RunWith(ITRunner.class)
-@UseModules({ TestGraphModule.class })
+
+@RunWith( ITRunner.class )
+@UseModules( { TestGraphModule.class } )
public class GraphManagerIT {
@@ -69,18 +72,16 @@ public class GraphManagerIT {
protected ApplicationScope scope;
-
-
@Before
public void mockApp() {
- this.scope = new ApplicationScopeImpl(createId("application") );
+ this.scope = new ApplicationScopeImpl( createId( "application" ) );
}
@Test
public void testWriteReadEdgeTypeSource() throws TimeoutException, InterruptedException {
- GraphManager gm = emf.createEdgeManager( scope );
+ GraphManager gm = emf.createEdgeManager( scope );
Edge edge = createEdge( "source", "test", "target" );
@@ -112,7 +113,7 @@ public class GraphManagerIT {
@Test
public void testWriteReadEdgeTypeTarget() throws TimeoutException, InterruptedException {
- GraphManager gm = emf.createEdgeManager( scope );
+ GraphManager gm = emf.createEdgeManager( scope );
Edge edge = createEdge( "source", "test", "target" );
@@ -144,7 +145,7 @@ public class GraphManagerIT {
@Test
public void testWriteReadEdgeTypeVersionSource() throws TimeoutException, InterruptedException {
- GraphManager gm = emf.createEdgeManager( scope );
+ GraphManager gm = emf.createEdgeManager( scope );
final long earlyVersion = 1000l;
@@ -178,7 +179,7 @@ public class GraphManagerIT {
@Test
public void testWriteReadEdgeTypeVersionTarget() throws TimeoutException, InterruptedException {
- GraphManager gm = emf.createEdgeManager( scope );
+ GraphManager gm = emf.createEdgeManager( scope );
final long earlyVersion = 10000l;
@@ -216,7 +217,7 @@ public class GraphManagerIT {
@Test
public void testWriteReadEdgeTypeVersionSourceDistinct() throws TimeoutException, InterruptedException {
- GraphManager gm = emf.createEdgeManager( scope );
+ GraphManager gm = emf.createEdgeManager( scope );
final long earlyVersion = 10000l;
@@ -241,7 +242,7 @@ public class GraphManagerIT {
//now test retrieving it, we should only get edge3, since it's the latest
SearchByEdgeType search =
- createSearchByEdge( edge1.getSourceNode(), edge1.getType(), edge3.getTimestamp(), null );
+ createSearchByEdge( edge1.getSourceNode(), edge1.getType(), edge3.getTimestamp(), null );
Observable<Edge> edges = gm.loadEdgesFromSource( search );
@@ -288,7 +289,7 @@ public class GraphManagerIT {
public void testWriteReadEdgeTypeVersionTargetDistinct() throws TimeoutException, InterruptedException {
- GraphManager gm = emf.createEdgeManager( scope );
+ GraphManager gm = emf.createEdgeManager( scope );
final long earlyVersion = 10000l;
@@ -314,7 +315,7 @@ public class GraphManagerIT {
//now test retrieving it, we should only get edge3, since it's the latest
SearchByEdgeType search =
- createSearchByEdge( edge1.getTargetNode(), edge1.getType(), edge3.getTimestamp(), null );
+ createSearchByEdge( edge1.getTargetNode(), edge1.getType(), edge3.getTimestamp(), null );
Observable<Edge> edges = gm.loadEdgesToTarget( search );
@@ -360,7 +361,7 @@ public class GraphManagerIT {
@Test
public void testWriteReadEdgeTypePagingSource() throws TimeoutException, InterruptedException {
- GraphManager gm = emf.createEdgeManager( scope );
+ GraphManager gm = emf.createEdgeManager( scope );
final Id sourceId = IdGenerator.createId( "source" );
@@ -380,7 +381,7 @@ public class GraphManagerIT {
//now test retrieving it
SearchByEdgeType search =
- createSearchByEdge( edge1.getSourceNode(), edge1.getType(), edge3.getTimestamp(), null );
+ createSearchByEdge( edge1.getSourceNode(), edge1.getType(), edge3.getTimestamp(), null );
Observable<Edge> edges = gm.loadEdgesFromSource( search );
@@ -415,7 +416,7 @@ public class GraphManagerIT {
public void testWriteReadEdgeTypePagingTarget() {
- GraphManager gm = emf.createEdgeManager( scope );
+ GraphManager gm = emf.createEdgeManager( scope );
final Id targetId = IdGenerator.createId( "target" );
@@ -436,7 +437,7 @@ public class GraphManagerIT {
//now test retrieving it
SearchByEdgeType search =
- createSearchByEdge( edge1.getTargetNode(), edge1.getType(), edge3.getTimestamp(), null );
+ createSearchByEdge( edge1.getTargetNode(), edge1.getType(), edge3.getTimestamp(), null );
Observable<Edge> edges = gm.loadEdgesToTarget( search );
@@ -470,7 +471,7 @@ public class GraphManagerIT {
@Test
public void testWriteReadEdgeTypeTargetTypeSource() {
- GraphManager gm = emf.createEdgeManager( scope );
+ GraphManager gm = emf.createEdgeManager( scope );
Edge edge = createEdge( "source", "test", "target" );
@@ -480,7 +481,7 @@ public class GraphManagerIT {
//now test retrieving it
SearchByIdType search = createSearchByEdgeAndId( edge.getSourceNode(), edge.getType(), edge.getTimestamp(),
- edge.getTargetNode().getType(), null );
+ edge.getTargetNode().getType(), null );
Observable<Edge> edges = gm.loadEdgesFromSourceByType( search );
@@ -492,7 +493,7 @@ public class GraphManagerIT {
//change edge type to be invalid, shouldn't get a result
search = createSearchByEdgeAndId( edge.getSourceNode(), edge.getType(), edge.getTimestamp(),
- edge.getTargetNode().getType() + "invalid", null );
+ edge.getTargetNode().getType() + "invalid", null );
edges = gm.loadEdgesFromSourceByType( search );
@@ -506,7 +507,7 @@ public class GraphManagerIT {
@Test
public void testWriteReadEdgeTypeTargetTypeTarget() {
- GraphManager gm = emf.createEdgeManager( scope );
+ GraphManager gm = emf.createEdgeManager( scope );
;
@@ -517,7 +518,7 @@ public class GraphManagerIT {
//now test retrieving it
SearchByIdType search = createSearchByEdgeAndId( edge.getTargetNode(), edge.getType(), edge.getTimestamp(),
- edge.getSourceNode().getType(), null );
+ edge.getSourceNode().getType(), null );
Observable<Edge> edges = gm.loadEdgesToTargetByType( search );
@@ -529,7 +530,7 @@ public class GraphManagerIT {
//change edge type to be invalid, shouldn't get a result
search = createSearchByEdgeAndId( edge.getTargetNode(), edge.getType(), edge.getTimestamp(),
- edge.getSourceNode().getType() + "invalid", null );
+ edge.getSourceNode().getType() + "invalid", null );
edges = gm.loadEdgesToTargetByType( search );
@@ -543,7 +544,7 @@ public class GraphManagerIT {
@Test
public void testWriteReadEdgeDeleteSource() {
- GraphManager gm = emf.createEdgeManager( scope );
+ GraphManager gm = emf.createEdgeManager( scope );
Edge edge = createEdge( "source", "test", "target" );
@@ -563,7 +564,7 @@ public class GraphManagerIT {
assertEquals( "Correct edge returned", edge, returned );
SearchByIdType searchById = createSearchByEdgeAndId( edge.getSourceNode(), edge.getType(), edge.getTimestamp(),
- edge.getTargetNode().getType(), null );
+ edge.getTargetNode().getType(), null );
edges = gm.loadEdgesFromSourceByType( searchById );
@@ -573,8 +574,7 @@ public class GraphManagerIT {
assertEquals( "Correct edge returned", edge, returned );
final SearchByEdge searchByEdge =
- createGetByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(), edge.getTimestamp(),
- null );
+ createGetByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(), edge.getTimestamp(), null );
returned = gm.loadEdgeVersions( searchByEdge ).toBlocking().single();
@@ -613,7 +613,7 @@ public class GraphManagerIT {
@Test
public void testWriteReadEdgeDeleteTarget() {
- GraphManager gm = emf.createEdgeManager( scope );
+ GraphManager gm = emf.createEdgeManager( scope );
Edge edge = createEdge( "source", "test", "target" );
@@ -633,7 +633,7 @@ public class GraphManagerIT {
assertEquals( "Correct edge returned", edge, returned );
SearchByIdType searchById = createSearchByEdgeAndId( edge.getTargetNode(), edge.getType(), edge.getTimestamp(),
- edge.getSourceNode().getType(), null );
+ edge.getSourceNode().getType(), null );
edges = gm.loadEdgesToTargetByType( searchById );
@@ -669,7 +669,7 @@ public class GraphManagerIT {
@Test
public void testWriteReadEdgeTypesSourceTypes() {
- final GraphManager gm = emf.createEdgeManager( scope );
+ final GraphManager gm = emf.createEdgeManager( scope );
Id sourceId = new SimpleId( "source" );
Id targetId1 = new SimpleId( "target" );
@@ -691,7 +691,7 @@ public class GraphManagerIT {
//get our 2 edge types
Observable<String> edges =
- gm.getEdgeTypesFromSource( new SimpleSearchEdgeType( testTargetEdge.getSourceNode(), null, null ) );
+ gm.getEdgeTypesFromSource( new SimpleSearchEdgeType( testTargetEdge.getSourceNode(), null, null ) );
Iterator<String> results = edges.toBlocking().getIterator();
@@ -718,8 +718,8 @@ public class GraphManagerIT {
assertFalse( "No results", results.hasNext() );
//now get types for test2
- edges = gm.getIdTypesFromSource(
- new SimpleSearchIdType( testTargetEdge.getSourceNode(), "test2", null, null ) );
+ edges =
+ gm.getIdTypesFromSource( new SimpleSearchIdType( testTargetEdge.getSourceNode(), "test2", null, null ) );
results = edges.toBlocking().getIterator();
@@ -732,7 +732,7 @@ public class GraphManagerIT {
@Test
public void testWriteReadEdgeTypesTargetTypes() {
- final GraphManager gm = emf.createEdgeManager( scope );
+ final GraphManager gm = emf.createEdgeManager( scope );
Id sourceId1 = new SimpleId( "source" );
Id sourceId2 = new SimpleId( "source2" );
@@ -797,7 +797,7 @@ public class GraphManagerIT {
@Test
public void testWriteReadEdgeTypesSourceTypesPaging() {
- final GraphManager gm = emf.createEdgeManager( scope );
+ final GraphManager gm = emf.createEdgeManager( scope );
Id sourceId1 = new SimpleId( "source" );
Id targetId1 = new SimpleId( "target" );
@@ -861,7 +861,7 @@ public class GraphManagerIT {
//now get the next page
edges = gm.getIdTypesFromSource(
- new SimpleSearchIdType( testTargetEdge.getSourceNode(), "test", null, targetId1.getType() ) );
+ new SimpleSearchIdType( testTargetEdge.getSourceNode(), "test", null, targetId1.getType() ) );
results = edges.toBlocking().getIterator();
@@ -875,7 +875,7 @@ public class GraphManagerIT {
@Test
public void testWriteReadEdgeTypesTargetTypesPaging() {
- final GraphManager gm = emf.createEdgeManager( scope );
+ final GraphManager gm = emf.createEdgeManager( scope );
Id sourceId1 = new SimpleId( "source" );
Id sourceId2 = new SimpleId( "source2" );
@@ -942,7 +942,7 @@ public class GraphManagerIT {
//now get the next page
edges = gm.getIdTypesToTarget(
- new SimpleSearchIdType( testTargetEdge.getTargetNode(), "test", null, sourceId1.getType() ) );
+ new SimpleSearchIdType( testTargetEdge.getTargetNode(), "test", null, sourceId1.getType() ) );
results = edges.toBlocking().getIterator();
@@ -956,7 +956,7 @@ public class GraphManagerIT {
@Test
public void testMarkSourceEdges() throws InterruptedException {
- final GraphManager gm = emf.createEdgeManager( scope );
+ final GraphManager gm = emf.createEdgeManager( scope );
Id sourceId = new SimpleId( "source" );
Id targetId1 = new SimpleId( "target" );
@@ -966,11 +966,11 @@ public class GraphManagerIT {
long startTime = System.currentTimeMillis();
long edge1Time = startTime;
- long edge2Time = edge1Time+1;
+ long edge2Time = edge1Time + 1;
- final long maxVersion= edge2Time;
+ final long maxVersion = edge2Time;
- Edge edge1 = createEdge( sourceId, "test", targetId1, edge1Time);
+ Edge edge1 = createEdge( sourceId, "test", targetId1, edge1Time );
gm.writeEdge( edge1 ).toBlocking().singleOrDefault( null );
@@ -979,21 +979,18 @@ public class GraphManagerIT {
gm.writeEdge( edge2 ).toBlocking().singleOrDefault( null );
-
assertTrue( Long.compare( maxVersion, edge2.getTimestamp() ) >= 0 );
assertTrue( Long.compare( maxVersion, edge1.getTimestamp() ) >= 0 );
//get our 2 edges
- Observable<Edge> edges = gm.loadEdgesFromSource(
- createSearchByEdge( edge1.getSourceNode(), edge1.getType(), maxVersion, null ) );
+ Observable<Edge> edges =
+ gm.loadEdgesFromSource( createSearchByEdge( edge1.getSourceNode(), edge1.getType(), maxVersion, null ) );
Iterator<Edge> results = edges.toBlocking().getIterator();
- System.out.println( "\n\n\n\n\n\n\n\n\n\n" );
-
assertEquals( "Edges correct", edge2, results.next() );
assertEquals( "Edges correct", edge1, results.next() );
@@ -1002,15 +999,12 @@ public class GraphManagerIT {
//now delete one of the edges
- System.out.println( "\n\n\n\n\n\n\n\n\n\n" );
gm.markEdge( edge1 ).toBlocking().last();
- System.out.println( "\n\n\n\n\n\n\n\n\n\n" );
-
- edges = gm.loadEdgesFromSource(
- createSearchByEdge( edge1.getSourceNode(), edge1.getType(), maxVersion, null ) );
+ edges =
+ gm.loadEdgesFromSource( createSearchByEdge( edge1.getSourceNode(), edge1.getType(), maxVersion, null ) );
results = edges.toBlocking().getIterator();
@@ -1020,16 +1014,14 @@ public class GraphManagerIT {
assertFalse( "No more edges", results.hasNext() );
- System.out.println( "\n\n\n\n\n\n\n\n\n\n" );
//now delete one of the edges
gm.markEdge( edge2 ).toBlocking().last();
- System.out.println( "\n\n\n\n\n\n\n\n\n\n" );
- edges = gm.loadEdgesFromSource(
- createSearchByEdge( edge1.getSourceNode(), edge1.getType(), maxVersion, null ) );
+ edges =
+ gm.loadEdgesFromSource( createSearchByEdge( edge1.getSourceNode(), edge1.getType(), maxVersion, null ) );
results = edges.toBlocking().getIterator();
@@ -1045,7 +1037,7 @@ public class GraphManagerIT {
@Test
public void testMarkTargetEdges() {
- final GraphManager gm = emf.createEdgeManager( scope );
+ final GraphManager gm = emf.createEdgeManager( scope );
Id sourceId1 = new SimpleId( "source" );
Id sourceId2 = new SimpleId( "source2" );
@@ -1065,7 +1057,7 @@ public class GraphManagerIT {
//get our 2 edges
Observable<Edge> edges =
- gm.loadEdgesToTarget( createSearchByEdge( edge1.getTargetNode(), edge1.getType(), maxVersion, null ) );
+ gm.loadEdgesToTarget( createSearchByEdge( edge1.getTargetNode(), edge1.getType(), maxVersion, null ) );
Iterator<Edge> results = edges.toBlocking().getIterator();
@@ -1111,9 +1103,163 @@ public class GraphManagerIT {
@Test
+ public void testMarkCompactSourceEdges() throws InterruptedException {
+
+ final GraphManager gm = emf.createEdgeManager( scope );
+
+ Id sourceId = new SimpleId( "source" );
+ Id targetId1 = new SimpleId( "target" );
+ Id targetId2 = new SimpleId( "target2" );
+
+
+ long startTime = System.currentTimeMillis();
+
+ long edge1Time = startTime;
+ long edge2Time = edge1Time + 1;
+
+ final long maxVersion = edge2Time;
+
+ Edge edge1 = createEdge( sourceId, "test", targetId1, edge1Time );
+
+ gm.writeEdge( edge1 ).toBlocking().singleOrDefault( null );
+
+ Edge edge2 = createEdge( sourceId, "test", targetId2, edge2Time );
+
+ gm.writeEdge( edge2 ).toBlocking().singleOrDefault( null );
+
+
+ assertTrue( Long.compare( maxVersion, edge2.getTimestamp() ) >= 0 );
+ assertTrue( Long.compare( maxVersion, edge1.getTimestamp() ) >= 0 );
+
+
+ gm.markEdge( edge1 ).toBlocking().last();
+
+
+ //get our 2 edges
+ Observable<Edge> edges = gm.loadEdgesFromSource(
+ createSearchByEdgeUnfiltered( edge1.getSourceNode(), edge1.getType(), maxVersion, null ) );
+
+
+ Iterator<Edge> results = edges.toBlocking().getIterator();
+
+
+ assertEquals( "Edges correct", edge2.getTargetNode(), results.next().getTargetNode() );
+
+ assertEquals( "Edges correct", edge1.getTargetNode(), results.next().getTargetNode() );
+
+ assertFalse( "No more edges", results.hasNext() );
+
+
+ //now delete one of the edges
+
+ gm.deleteEdge( edge1 ).toBlocking().last();
+
+
+ edges = gm.loadEdgesFromSource(
+ createSearchByEdgeUnfiltered( edge1.getSourceNode(), edge1.getType(), maxVersion, null ) );
+
+
+ results = edges.toBlocking().getIterator();
+
+ assertEquals( "Edges correct", edge2.getTargetNode(), results.next().getTargetNode() );
+
+ assertFalse( "No more edges", results.hasNext() );
+
+ //delete an edge we didn't mark, should be a no-op
+ gm.deleteEdge( edge2 ).toBlocking().lastOrDefault( null );
+
+ edges = gm.loadEdgesFromSource(
+ createSearchByEdgeUnfiltered( edge1.getSourceNode(), edge1.getType(), maxVersion, null ) );
+
+
+ results = edges.toBlocking().getIterator();
+
+
+ assertEquals( "Edges correct", edge2.getSourceNode(), results.next().getSourceNode() );
+
+ assertFalse( "No more edges", results.hasNext() );
+ }
+
+
+ @Test
+ public void testMarkCompactTargetEdges() {
+
+ final GraphManager gm = emf.createEdgeManager( scope );
+
+ Id sourceId1 = new SimpleId( "source" );
+ Id sourceId2 = new SimpleId( "source2" );
+ Id targetId = new SimpleId( "target" );
+
+ Edge edge1 = createEdge( sourceId1, "test", targetId, System.currentTimeMillis() );
+
+ gm.writeEdge( edge1 ).toBlocking().last();
+
+ Edge edge2 = createEdge( sourceId2, "test", targetId, System.currentTimeMillis() );
+
+ gm.writeEdge( edge2 ).toBlocking().last();
+
+
+ final long maxVersion = System.currentTimeMillis();
+
+
+ //now delete one of the edges
+
+ gm.markEdge( edge1 ).toBlocking().last();
+
+ //get our 2 edges
+ Observable<Edge> edges = gm.loadEdgesToTarget(
+ createSearchByEdgeUnfiltered( edge1.getTargetNode(), edge1.getType(), maxVersion, null ) );
+
+
+ Iterator<Edge> results = edges.toBlocking().getIterator();
+
+
+ assertEquals( "Edges correct", edge2.getSourceNode(), results.next().getSourceNode() );
+
+ assertEquals( "Edges correct", edge1.getSourceNode(), results.next().getSourceNode() );
+
+
+ assertFalse( "No more edges", results.hasNext() );
+
+
+ //now delete one of the edges
+
+ gm.deleteEdge( edge1 ).toBlocking().last();
+
+ edges = gm.loadEdgesToTarget(
+ createSearchByEdgeUnfiltered( edge1.getTargetNode(), edge1.getType(), maxVersion, null ) );
+
+
+ results = edges.toBlocking().getIterator();
+
+
+ assertEquals( "Edges correct", edge2.getSourceNode(), results.next().getSourceNode() );
+
+ assertFalse( "No more edges", results.hasNext() );
+
+ //now delete one of the edges
+
+
+ //delete an edge we didn't mark, should be a no-op
+ gm.deleteEdge( edge2 ).toBlocking().lastOrDefault( null );
+
+ edges = gm.loadEdgesToTarget(
+ createSearchByEdgeUnfiltered( edge1.getTargetNode(), edge1.getType(), maxVersion, null ) );
+
+
+ results = edges.toBlocking().getIterator();
+
+
+ assertEquals( "Edges correct", edge2.getSourceNode(), results.next().getSourceNode() );
+
+ assertFalse( "No more edges", results.hasNext() );
+ }
+
+
+ @Test
public void testMarkSourceEdgesType() {
- final GraphManager gm = emf.createEdgeManager( scope );
+ final GraphManager gm = emf.createEdgeManager( scope );
Id sourceId = new SimpleId( "source" );
Id targetId1 = new SimpleId( "target" );
@@ -1133,7 +1279,7 @@ public class GraphManagerIT {
//get our 2 edges
Observable<Edge> edges = gm.loadEdgesFromSourceByType(
- createSearchByEdgeAndId( sourceId, edge1.getType(), maxVersion, targetId1.getType(), null ) );
+ createSearchByEdgeAndId( sourceId, edge1.getType(), maxVersion, targetId1.getType(), null ) );
Iterator<Edge> results = edges.toBlocking().getIterator();
@@ -1149,7 +1295,7 @@ public class GraphManagerIT {
edges = gm.loadEdgesFromSourceByType(
- createSearchByEdgeAndId( sourceId, edge1.getType(), maxVersion, targetId1.getType(), null ) );
+ createSearchByEdgeAndId( sourceId, edge1.getType(), maxVersion, targetId1.getType(), null ) );
results = edges.toBlocking().getIterator();
@@ -1158,7 +1304,7 @@ public class GraphManagerIT {
edges = gm.loadEdgesFromSourceByType(
- createSearchByEdgeAndId( sourceId, edge1.getType(), maxVersion, targetId2.getType(), null ) );
+ createSearchByEdgeAndId( sourceId, edge1.getType(), maxVersion, targetId2.getType(), null ) );
results = edges.toBlocking().getIterator();
@@ -1174,7 +1320,7 @@ public class GraphManagerIT {
edges = gm.loadEdgesFromSourceByType(
- createSearchByEdgeAndId( sourceId, edge1.getType(), maxVersion, targetId2.getType(), null ) );
+ createSearchByEdgeAndId( sourceId, edge1.getType(), maxVersion, targetId2.getType(), null ) );
results = edges.toBlocking().getIterator();
@@ -1191,7 +1337,7 @@ public class GraphManagerIT {
@Test
public void testMarkTargetEdgesType() {
- final GraphManager gm = emf.createEdgeManager( scope );
+ final GraphManager gm = emf.createEdgeManager( scope );
Id sourceId1 = new SimpleId( "source" );
Id sourceId2 = new SimpleId( "source2" );
@@ -1210,7 +1356,7 @@ public class GraphManagerIT {
//get our 2 edges
Observable<Edge> edges = gm.loadEdgesToTargetByType(
- createSearchByEdgeAndId( targetId, edge1.getType(), maxVersion, sourceId1.getType(), null ) );
+ createSearchByEdgeAndId( targetId, edge1.getType(), maxVersion, sourceId1.getType(), null ) );
Iterator<Edge> results = edges.toBlocking().getIterator();
@@ -1226,8 +1372,7 @@ public class GraphManagerIT {
edges = gm.loadEdgesToTargetByType(
- createSearchByEdgeAndId( edge1.getSourceNode(), edge1.getType(), maxVersion, sourceId1.getType(),
- null ) );
+ createSearchByEdgeAndId( edge1.getSourceNode(), edge1.getType(), maxVersion, sourceId1.getType(), null ) );
results = edges.toBlocking().getIterator();
@@ -1236,7 +1381,7 @@ public class GraphManagerIT {
edges = gm.loadEdgesToTargetByType(
- createSearchByEdgeAndId( targetId, edge1.getType(), maxVersion, sourceId2.getType(), null ) );
+ createSearchByEdgeAndId( targetId, edge1.getType(), maxVersion, sourceId2.getType(), null ) );
results = edges.toBlocking().getIterator();
@@ -1252,7 +1397,7 @@ public class GraphManagerIT {
edges = gm.loadEdgesToTargetByType(
- createSearchByEdgeAndId( targetId, edge1.getType(), maxVersion, sourceId2.getType(), null ) );
+ createSearchByEdgeAndId( targetId, edge1.getType(), maxVersion, sourceId2.getType(), null ) );
results = edges.toBlocking().getIterator();
@@ -1269,7 +1414,7 @@ public class GraphManagerIT {
@Test
public void markSourceNode() {
- final GraphManager gm = emf.createEdgeManager( scope );
+ final GraphManager gm = emf.createEdgeManager( scope );
Id sourceId = new SimpleId( "source" );
Id targetId1 = new SimpleId( "target" );
@@ -1287,8 +1432,8 @@ public class GraphManagerIT {
final long maxVersion = System.currentTimeMillis();
Iterator<Edge> results =
- gm.loadEdgesFromSource( createSearchByEdge( sourceId, edge1.getType(), maxVersion, null ) )
- .toBlocking().getIterator();
+ gm.loadEdgesFromSource( createSearchByEdge( sourceId, edge1.getType(), maxVersion, null ) ).toBlocking()
+ .getIterator();
assertEquals( "Edge found", edge2, results.next() );
@@ -1300,8 +1445,8 @@ public class GraphManagerIT {
//get our 2 edges
results = gm.loadEdgesFromSourceByType(
- createSearchByEdgeAndId( sourceId, edge1.getType(), maxVersion, targetId1.getType(), null ) )
- .toBlocking().getIterator();
+ createSearchByEdgeAndId( sourceId, edge1.getType(), maxVersion, targetId1.getType(), null ) ).toBlocking()
+ .getIterator();
assertEquals( "Edges correct", edge1, results.next() );
@@ -1310,8 +1455,8 @@ public class GraphManagerIT {
//now delete one of the edges
results = gm.loadEdgesFromSourceByType(
- createSearchByEdgeAndId( sourceId, edge2.getType(), maxVersion, targetId2.getType(), null ) )
- .toBlocking().getIterator();
+ createSearchByEdgeAndId( sourceId, edge2.getType(), maxVersion, targetId2.getType(), null ) ).toBlocking()
+ .getIterator();
assertEquals( "Edges correct", edge2, results.next() );
@@ -1324,26 +1469,61 @@ public class GraphManagerIT {
//now re-read, nothing should be there since they're marked
- results = gm.loadEdgesFromSource( createSearchByEdge( sourceId, edge1.getType(), maxVersion, null ) )
- .toBlocking().getIterator();
+ results =
+ gm.loadEdgesFromSource( createSearchByEdge( sourceId, edge1.getType(), maxVersion, null ) ).toBlocking()
+ .getIterator();
assertFalse( "No more edges", results.hasNext() );
//get our 2 edges
results = gm.loadEdgesFromSourceByType(
- createSearchByEdgeAndId( sourceId, edge1.getType(), maxVersion, targetId1.getType(), null ) )
- .toBlocking().getIterator();
+ createSearchByEdgeAndId( sourceId, edge1.getType(), maxVersion, targetId1.getType(), null ) ).toBlocking()
+ .getIterator();
assertFalse( "No more edges", results.hasNext() );
+
//now delete one of the edges
results = gm.loadEdgesFromSourceByType(
- createSearchByEdgeAndId( sourceId, edge2.getType(), maxVersion, targetId2.getType(), null ) )
+ createSearchByEdgeAndId( sourceId, edge2.getType(), maxVersion, targetId2.getType(), null ) ).toBlocking()
+ .getIterator();
+
+
+ assertFalse( "No more edges", results.hasNext() );
+
+
+ //test they come back unfiltered
+
+ //read with filter marked off to ensure they're still persisted
+ results = gm.loadEdgesFromSource( createSearchByEdgeUnfiltered( sourceId, edge1.getType(), maxVersion, null ) )
+ .toBlocking().getIterator();
+
+
+ assertEquals( "Edge found", edge2, results.next() );
+
+ assertEquals( "Edge found", edge1, results.next() );
+
+ assertFalse( "No more edges", results.hasNext() );
+
+
+ results = gm.loadEdgesFromSourceByType(
+ createSearchByEdgeAndIdUnfiltered( sourceId, edge1.getType(), maxVersion, targetId1.getType(), null ) )
.toBlocking().getIterator();
+ assertEquals( "Edges correct", edge1, results.next() );
+
+ assertFalse( "No more edges", results.hasNext() );
+
+ results = gm.loadEdgesFromSourceByType(
+ createSearchByEdgeAndIdUnfiltered( sourceId, edge2.getType(), maxVersion, targetId2.getType(), null ) )
+ .toBlocking().getIterator();
+
+
+ assertEquals( "Edges correct", edge2, results.next() );
+
assertFalse( "No more edges", results.hasNext() );
}
@@ -1351,7 +1531,7 @@ public class GraphManagerIT {
@Test
public void markTargetNode() {
- final GraphManager gm = emf.createEdgeManager( scope );
+ final GraphManager gm = emf.createEdgeManager( scope );
Id sourceId1 = new SimpleId( "source" );
Id sourceId2 = new SimpleId( "source2" );
@@ -1369,8 +1549,8 @@ public class GraphManagerIT {
final long maxVersion = System.currentTimeMillis();
Iterator<Edge> results =
- gm.loadEdgesToTarget( createSearchByEdge( targetId, edge1.getType(), maxVersion, null ) )
- .toBlocking().getIterator();
+ gm.loadEdgesToTarget( createSearchByEdge( targetId, edge1.getType(), maxVersion, null ) ).toBlocking()
+ .getIterator();
assertEquals( "Edge found", edge2, results.next() );
@@ -1382,7 +1562,69 @@ public class GraphManagerIT {
//get our 2 edges
results = gm.loadEdgesToTargetByType(
- createSearchByEdgeAndId( targetId, edge1.getType(), maxVersion, sourceId1.getType(), null ) )
+ createSearchByEdgeAndId( targetId, edge1.getType(), maxVersion, sourceId1.getType(), null ) ).toBlocking()
+ .getIterator();
+
+
+ assertEquals( "Edges correct", edge1, results.next() );
+
+ assertFalse( "No more edges", results.hasNext() );
+
+ //now delete one of the edges
+ results = gm.loadEdgesToTargetByType(
+ createSearchByEdgeAndId( targetId, edge2.getType(), maxVersion, sourceId2.getType(), null ) ).toBlocking()
+ .getIterator();
+
+
+ assertEquals( "Edges correct", edge2, results.next() );
+
+ assertFalse( "No more edges", results.hasNext() );
+
+ //mark the source node
+ gm.markNode( targetId, edge2.getTimestamp() ).toBlocking().last();
+
+
+ //now re-read, nothing should be there since they're marked
+
+ results = gm.loadEdgesToTarget( createSearchByEdge( targetId, edge1.getType(), maxVersion, null ) ).toBlocking()
+ .getIterator();
+
+ assertFalse( "No more edges", results.hasNext() );
+
+
+ //get our 2 edges
+ results = gm.loadEdgesToTargetByType(
+ createSearchByEdgeAndId( targetId, edge1.getType(), maxVersion, sourceId1.getType(), null ) ).toBlocking()
+ .getIterator();
+
+
+ assertFalse( "No more edges", results.hasNext() );
+
+ //now delete one of the edges
+ results = gm.loadEdgesToTargetByType(
+ createSearchByEdgeAndId( targetId, edge2.getType(), maxVersion, sourceId2.getType(), null ) ).toBlocking()
+ .getIterator();
+
+
+ assertFalse( "No more edges", results.hasNext() );
+
+
+ //now test they come back when unfiltered
+
+ results = gm.loadEdgesToTarget( createSearchByEdgeUnfiltered( targetId, edge1.getType(), maxVersion, null ) )
+ .toBlocking().getIterator();
+
+
+ assertEquals( "Edge found", edge2, results.next() );
+
+ assertEquals( "Edge found", edge1, results.next() );
+
+ assertFalse( "No more edges", results.hasNext() );
+
+
+ //get our 2 edges
+ results = gm.loadEdgesToTargetByType(
+ createSearchByEdgeAndIdUnfiltered( targetId, edge1.getType(), maxVersion, sourceId1.getType(), null ) )
.toBlocking().getIterator();
@@ -1392,10 +1634,180 @@ public class GraphManagerIT {
//now delete one of the edges
results = gm.loadEdgesToTargetByType(
- createSearchByEdgeAndId( targetId, edge2.getType(), maxVersion, sourceId2.getType(), null ) )
+ createSearchByEdgeAndIdUnfiltered( targetId, edge2.getType(), maxVersion, sourceId2.getType(), null ) )
+ .toBlocking().getIterator();
+
+
+ assertEquals( "Edges correct", edge2, results.next() );
+
+ assertFalse( "No more edges", results.hasNext() );
+ }
+
+
+ @Test
+ public void markDeleteSourceNode() {
+
+ final GraphManager gm = emf.createEdgeManager( scope );
+
+ Id sourceId = new SimpleId( "source" );
+ Id targetId1 = new SimpleId( "target" );
+ Id targetId2 = new SimpleId( "target2" );
+
+ Edge edge1 = createEdge( sourceId, "test", targetId1, System.currentTimeMillis() );
+
+ gm.writeEdge( edge1 ).toBlocking().singleOrDefault( null );
+
+ Edge edge2 = createEdge( sourceId, "test", targetId2, System.currentTimeMillis() );
+
+ gm.writeEdge( edge2 ).toBlocking().singleOrDefault( null );
+
+
+ final long maxVersion = System.currentTimeMillis();
+
+ Iterator<Edge> results =
+ gm.loadEdgesFromSource( createSearchByEdge( sourceId, edge1.getType(), maxVersion, null ) ).toBlocking()
+ .getIterator();
+
+
+ assertEquals( "Edge found", edge2, results.next() );
+
+ assertEquals( "Edge found", edge1, results.next() );
+
+ assertFalse( "No more edges", results.hasNext() );
+
+
+ //get our 2 edges
+ results = gm.loadEdgesFromSourceByType(
+ createSearchByEdgeAndId( sourceId, edge1.getType(), maxVersion, targetId1.getType(), null ) ).toBlocking()
+ .getIterator();
+
+
+ assertEquals( "Edges correct", edge1, results.next() );
+
+ assertFalse( "No more edges", results.hasNext() );
+
+ //now delete one of the edges
+ results = gm.loadEdgesFromSourceByType(
+ createSearchByEdgeAndId( sourceId, edge2.getType(), maxVersion, targetId2.getType(), null ) ).toBlocking()
+ .getIterator();
+
+
+ assertEquals( "Edges correct", edge2, results.next() );
+
+ assertFalse( "No more edges", results.hasNext() );
+
+ //mark the source node
+ gm.markNode( sourceId, edge2.getTimestamp() ).toBlocking().last();
+
+
+ //test they come back unfiltered
+
+ //read with filter marked off to ensure they're still persisted
+ results = gm.loadEdgesFromSource( createSearchByEdgeUnfiltered( sourceId, edge1.getType(), maxVersion, null ) )
+ .toBlocking().getIterator();
+
+
+ assertEquals( "Edge found", edge2, results.next() );
+
+ assertEquals( "Edge found", edge1, results.next() );
+
+ assertFalse( "No more edges", results.hasNext() );
+
+
+ results = gm.loadEdgesFromSourceByType(
+ createSearchByEdgeAndIdUnfiltered( sourceId, edge1.getType(), maxVersion, targetId1.getType(), null ) )
+ .toBlocking().getIterator();
+
+
+ assertEquals( "Edges correct", edge1, results.next() );
+
+ assertFalse( "No more edges", results.hasNext() );
+
+ results = gm.loadEdgesFromSourceByType(
+ createSearchByEdgeAndIdUnfiltered( sourceId, edge2.getType(), maxVersion, targetId2.getType(), null ) )
+ .toBlocking().getIterator();
+
+
+ assertEquals( "Edges correct", edge2, results.next() );
+
+ assertFalse( "No more edges", results.hasNext() );
+
+ //now delete them
+
+
+ gm.compactNode( sourceId ).toBlocking().last();
+
+ results = gm.loadEdgesFromSource( createSearchByEdgeUnfiltered( sourceId, edge1.getType(), maxVersion, null ) )
.toBlocking().getIterator();
+ assertFalse( "No more edges", results.hasNext() );
+
+
+ results = gm.loadEdgesFromSourceByType(
+ createSearchByEdgeAndIdUnfiltered( sourceId, edge1.getType(), maxVersion, targetId1.getType(), null ) )
+ .toBlocking().getIterator();
+
+
+ assertFalse( "No more edges", results.hasNext() );
+
+ results = gm.loadEdgesFromSourceByType(
+ createSearchByEdgeAndIdUnfiltered( sourceId, edge2.getType(), maxVersion, targetId2.getType(), null ) )
+ .toBlocking().getIterator();
+
+
+ assertFalse( "No more edges", results.hasNext() );
+ }
+
+
+ @Test
+ public void markDeleteTargetNode() {
+
+ final GraphManager gm = emf.createEdgeManager( scope );
+
+ Id sourceId1 = new SimpleId( "source" );
+ Id sourceId2 = new SimpleId( "source2" );
+ Id targetId = new SimpleId( "target" );
+
+ Edge edge1 = createEdge( sourceId1, "test", targetId, System.currentTimeMillis() );
+
+ gm.writeEdge( edge1 ).toBlocking().singleOrDefault( null );
+
+ Edge edge2 = createEdge( sourceId2, "test", targetId, System.currentTimeMillis() );
+
+ gm.writeEdge( edge2 ).toBlocking().singleOrDefault( null );
+
+
+ final long maxVersion = System.currentTimeMillis();
+
+ Iterator<Edge> results =
+ gm.loadEdgesToTarget( createSearchByEdge( targetId, edge1.getType(), maxVersion, null ) ).toBlocking()
+ .getIterator();
+
+
+ assertEquals( "Edge found", edge2, results.next() );
+
+ assertEquals( "Edge found", edge1, results.next() );
+
+ assertFalse( "No more edges", results.hasNext() );
+
+
+ //get our 2 edges
+ results = gm.loadEdgesToTargetByType(
+ createSearchByEdgeAndId( targetId, edge1.getType(), maxVersion, sourceId1.getType(), null ) ).toBlocking()
+ .getIterator();
+
+
+ assertEquals( "Edges correct", edge1, results.next() );
+
+ assertFalse( "No more edges", results.hasNext() );
+
+ //now delete one of the edges
+ results = gm.loadEdgesToTargetByType(
+ createSearchByEdgeAndId( targetId, edge2.getType(), maxVersion, sourceId2.getType(), null ) ).toBlocking()
+ .getIterator();
+
+
assertEquals( "Edges correct", edge2, results.next() );
assertFalse( "No more edges", results.hasNext() );
@@ -1406,15 +1818,55 @@ public class GraphManagerIT {
//now re-read, nothing should be there since they're marked
- results = gm.loadEdgesToTarget( createSearchByEdge( targetId, edge1.getType(), maxVersion, null ) )
+
+ //now test they come back when unfiltered
+
+ results = gm.loadEdgesToTarget( createSearchByEdgeUnfiltered( targetId, edge1.getType(), maxVersion, null ) )
+ .toBlocking().getIterator();
+
+
+ assertEquals( "Edge found", edge2, results.next() );
+
+ assertEquals( "Edge found", edge1, results.next() );
+
+ assertFalse( "No more edges", results.hasNext() );
+
+
+ //get our 2 edges
+ results = gm.loadEdgesToTargetByType(
+ createSearchByEdgeAndIdUnfiltered( targetId, edge1.getType(), maxVersion, sourceId1.getType(), null ) )
+ .toBlocking().getIterator();
+
+
+ assertEquals( "Edges correct", edge1, results.next() );
+
+ assertFalse( "No more edges", results.hasNext() );
+
+ //now delete one of the edges
+ results = gm.loadEdgesToTargetByType(
+ createSearchByEdgeAndIdUnfiltered( targetId, edge2.getType(), maxVersion, sourceId2.getType(), null ) )
.toBlocking().getIterator();
+
+ assertEquals( "Edges correct", edge2, results.next() );
+
+ assertFalse( "No more edges", results.hasNext() );
+
+ //now compact, everything should be removed
+
+ gm.compactNode( targetId ).toBlocking().last();
+
+
+ results = gm.loadEdgesToTarget( createSearchByEdgeUnfiltered( targetId, edge1.getType(), maxVersion, null ) )
+ .toBlocking().getIterator();
+
+
assertFalse( "No more edges", results.hasNext() );
//get our 2 edges
results = gm.loadEdgesToTargetByType(
- createSearchByEdgeAndId( targetId, edge1.getType(), maxVersion, sourceId1.getType(), null ) )
+ createSearchByEdgeAndIdUnfiltered( targetId, edge1.getType(), maxVersion, sourceId1.getType(), null ) )
.toBlocking().getIterator();
@@ -1422,7 +1874,7 @@ public class GraphManagerIT {
//now delete one of the edges
results = gm.loadEdgesToTargetByType(
- createSearchByEdgeAndId( targetId, edge2.getType(), maxVersion, sourceId2.getType(), null ) )
+ createSearchByEdgeAndIdUnfiltered( targetId, edge2.getType(), maxVersion, sourceId2.getType(), null ) )
.toBlocking().getIterator();
@@ -1433,7 +1885,7 @@ public class GraphManagerIT {
@Test
public void testWriteReadEdgeTypesSourceTypesPrefix() {
- final GraphManager gm = emf.createEdgeManager( scope );
+ final GraphManager gm = emf.createEdgeManager( scope );
Id sourceId = new SimpleId( "source" );
Id targetId = new SimpleId( "target" );
@@ -1454,7 +1906,7 @@ public class GraphManagerIT {
//get our 2 edge types
Observable<String> edges =
- gm.getEdgeTypesFromSource( new SimpleSearchEdgeType( testTargetEdge.getSourceNode(), "test1", null ) );
+ gm.getEdgeTypesFromSource( new SimpleSearchEdgeType( testTargetEdge.getSourceNode(), "test1", null ) );
Iterator<String> results = edges.toBlocking().getIterator();
@@ -1483,7 +1935,7 @@ public class GraphManagerIT {
public void testSourceSubTypes() {
//now test sub edges
- final GraphManager gm = emf.createEdgeManager( scope );
+ final GraphManager gm = emf.createEdgeManager( scope );
Id sourceId = new SimpleId( "source" );
Id targetId1target1 = new SimpleId( "type1target1" );
@@ -1504,8 +1956,8 @@ public class GraphManagerIT {
gm.writeEdge( test2TargetEdge ).toBlocking().singleOrDefault( null );
- Observable<String> edges = gm.getIdTypesFromSource(
- new SimpleSearchIdType( testTargetEdge.getSourceNode(), "test", "type1", null ) );
+ Observable<String> edges =
+ gm.getIdTypesFromSource( new SimpleSearchIdType( testTargetEdge.getSourceNode(), "test", "type1", null ) );
Iterator<String> results = edges.toBlocking().getIterator();
@@ -1517,8 +1969,8 @@ public class GraphManagerIT {
assertFalse( "No results", results.hasNext() );
//now get types for test2
- edges = gm.getIdTypesFromSource(
- new SimpleSearchIdType( testTargetEdge.getSourceNode(), "test", "type2", null ) );
+ edges =
+ gm.getIdTypesFromSource( new SimpleSearchIdType( testTargetEdge.getSourceNode(), "test", "type2", null ) );
results = edges.toBlocking().getIterator();
@@ -1532,7 +1984,7 @@ public class GraphManagerIT {
@Test
public void testWriteReadEdgeTypesTargetTypesPrefix() {
- final GraphManager gm = emf.createEdgeManager( scope );
+ final GraphManager gm = emf.createEdgeManager( scope );
Id targetId = new SimpleId( "target" );
Id sourceId = new SimpleId( "source" );
@@ -1553,7 +2005,7 @@ public class GraphManagerIT {
//get our 2 edge types
Observable<String> edges =
- gm.getEdgeTypesToTarget( new SimpleSearchEdgeType( testTargetEdge.getTargetNode(), "test1", null ) );
+ gm.getEdgeTypesToTarget( new SimpleSearchEdgeType( testTargetEdge.getTargetNode(), "test1", null ) );
Iterator<String> results = edges.toBlocking().getIterator();
@@ -1582,7 +2034,7 @@ public class GraphManagerIT {
public void testTargetSubTypes() {
//now test sub edges
- final GraphManager gm = emf.createEdgeManager( scope );
+ final GraphManager gm = emf.createEdgeManager( scope );
Id targetId = new SimpleId( "target" );
Id sourceId1target1 = new SimpleId( "type1source1" );
@@ -1603,8 +2055,8 @@ public class GraphManagerIT {
gm.writeEdge( test2TargetEdge ).toBlocking().singleOrDefault( null );
- Observable<String> edges = gm.getIdTypesToTarget(
- new SimpleSearchIdType( testTargetEdge.getTargetNode(), "test", "type1", null ) );
+ Observable<String> edges =
+ gm.getIdTypesToTarget( new SimpleSearchIdType( testTargetEdge.getTargetNode(), "test", "type1", null ) );
Iterator<String> results = edges.toBlocking().getIterator();
@@ -1616,8 +2068,8 @@ public class GraphManagerIT {
assertFalse( "No results", results.hasNext() );
//now get types for test2
- edges = gm.getIdTypesToTarget(
- new SimpleSearchIdType( testTargetEdge.getTargetNode(), "test", "type2", null ) );
+ edges =
+ gm.getIdTypesToTarget( new SimpleSearchIdType( testTargetEdge.getTargetNode(), "test", "type2", null ) );
results = edges.toBlocking().getIterator();
@@ -1629,7 +2081,7 @@ public class GraphManagerIT {
@Test( expected = NullPointerException.class )
- public void invalidEdgeTypesWrite( ) {
+ public void invalidEdgeTypesWrite() {
final GraphManager em = emf.createEdgeManager( scope );
em.writeEdge( null );
@@ -1637,7 +2089,7 @@ public class GraphManagerIT {
@Test( expected = NullPointerException.class )
- public void invalidEdgeTypesDelete( ) {
+ public void invalidEdgeTypesDelete() {
final GraphManager em = emf.createEdgeManager( scope );
em.markEdge( null );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/59ad6a98/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/test/util/EdgeTestUtils.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/test/util/EdgeTestUtils.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/test/util/EdgeTestUtils.java
index b2142e8..718fc70 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/test/util/EdgeTestUtils.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/test/util/EdgeTestUtils.java
@@ -152,6 +152,19 @@ public class EdgeTestUtils {
return new SimpleSearchByEdgeType( sourceId, type, maxVersion, SearchByEdgeType.Order.DESCENDING, last );
}
+ /**
+ *
+ * @param sourceId
+ * @param type
+ * @param maxVersion
+ * @param last
+ * @return
+ */
+ public static SearchByEdgeType createSearchByEdgeUnfiltered( final Id sourceId, final String type, final long maxVersion,
+ final Edge last ) {
+ return new SimpleSearchByEdgeType( sourceId, type, maxVersion, SearchByEdgeType.Order.DESCENDING, Optional.fromNullable( last ) , false );
+ }
+
/**
*
@@ -167,6 +180,20 @@ public class EdgeTestUtils {
return new SimpleSearchByIdType( sourceId, type, maxVersion, SearchByEdgeType.Order.DESCENDING, idType, Optional.fromNullable(last) );
}
+ /**
+ *
+ * @param sourceId
+ * @param type
+ * @param maxVersion
+ * @param idType
+ * @param last
+ * @return
+ */
+ public static SearchByIdType createSearchByEdgeAndIdUnfiltered( final Id sourceId, final String type, final long maxVersion,
+ final String idType, final Edge last ) {
+ return new SimpleSearchByIdType( sourceId, type, maxVersion, SearchByEdgeType.Order.DESCENDING, idType, Optional.fromNullable(last), false );
+ }
+
/**
*