You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2015/10/26 17:58:14 UTC
[4/7] usergrid git commit: Changed the interface to return MarkedEdge
interface. This allows the caller to determine if they want to filter or not,
as well as to perform read-repair actions on read.
Changed the interface to return MarkedEdge interface. This allows the caller to determine if they want to filter or not, as well as to perform read-repair actions on read.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/1d4f22c5
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/1d4f22c5
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/1d4f22c5
Branch: refs/heads/2.1-release
Commit: 1d4f22c59c057a6f9d52844e03897786b0773a5b
Parents: 4e51d38
Author: Todd Nine <tn...@apigee.com>
Authored: Fri Oct 23 15:25:39 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri Oct 23 15:25:39 2015 -0600
----------------------------------------------------------------------
.../corepersistence/CpEntityManagerFactory.java | 7 +-
.../corepersistence/CpRelationManager.java | 16 ++---
.../read/traverse/AbstractReadGraphFilter.java | 74 +++++++++++++++----
.../read/traverse/EdgeCursorSerializer.java | 8 ++-
.../traverse/ReadGraphCollectionFilter.java | 5 +-
.../ReadGraphConnectionByTypeFilter.java | 11 +--
.../traverse/ReadGraphConnectionFilter.java | 5 +-
.../index/AsyncIndexServiceTest.java | 3 +-
.../corepersistence/index/IndexServiceTest.java | 12 ++--
.../pipeline/cursor/CursorTest.java | 24 +++----
.../service/ConnectionServiceImplTest.java | 5 +-
.../persistence/ApplicationServiceIT.java | 4 +-
.../persistence/graph/GraphManager.java | 14 ++--
.../usergrid/persistence/graph/MarkedEdge.java | 1 +
.../graph/impl/GraphManagerImpl.java | 30 ++++----
.../graph/impl/SimpleMarkedEdge.java | 39 ++++++----
.../persistence/graph/GraphManagerIT.java | 76 ++++++++++----------
.../persistence/graph/GraphManagerLoadTest.java | 10 +--
.../graph/GraphManagerShardConsistencyIT.java | 6 +-
.../graph/GraphManagerStressTest.java | 16 ++---
.../management/AppInfoMigrationPlugin.java | 5 +-
21 files changed, 220 insertions(+), 151 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index 4bdade5..c75a025 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -63,6 +63,7 @@ import org.apache.usergrid.persistence.exceptions.EntityNotFoundException;
import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.graph.GraphManager;
import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
import org.apache.usergrid.persistence.graph.SearchByEdgeType;
import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
import org.apache.usergrid.persistence.index.EntityIndex;
@@ -477,9 +478,9 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
logger.debug("getApplications(): Loading edges of edgeType {} from {}:{}",
new Object[]{edgeType, managementId.getType(), managementId.getUuid()});
- Observable<Edge> edges = gm.loadEdgesFromSource( new SimpleSearchByEdgeType(
- managementId, edgeType, Long.MAX_VALUE,
- SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent() ));
+ Observable<MarkedEdge> edges = gm.loadEdgesFromSource(
+ new SimpleSearchByEdgeType( managementId, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
+ Optional.<Edge>absent() ) );
final EntityCollectionManager ecm = managerCache.getEntityCollectionManager( appScope );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index aad7610..b4cabc4 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -59,6 +59,7 @@ import org.apache.usergrid.persistence.entities.Group;
import org.apache.usergrid.persistence.entities.User;
import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
import org.apache.usergrid.persistence.graph.SearchByEdge;
import org.apache.usergrid.persistence.graph.SearchByEdgeType;
import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
@@ -227,14 +228,9 @@ public class CpRelationManager implements RelationManager {
Observable<Edge> edges =
gm.getEdgeTypesToTarget( new SimpleSearchEdgeType( cpHeadEntity.getId(), edgeType, null ) )
- .flatMap( new Func1<String, Observable<Edge>>() {
- @Override
- public Observable<Edge> call( final String edgeType ) {
- return gm.loadEdgesToTarget(
- new SimpleSearchByEdgeType( cpHeadEntity.getId(), edgeType, Long.MAX_VALUE,
- SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent() ) );
- }
- } );
+ .flatMap( edgeType1 -> gm.loadEdgesToTarget(
+ new SimpleSearchByEdgeType( cpHeadEntity.getId(), edgeType1, Long.MAX_VALUE,
+ SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent() ) ) );
//if our limit is set, take them. Note this logic is still borked, we can't possibly fit everything in memmory
if ( limit > -1 ) {
@@ -268,7 +264,7 @@ public class CpRelationManager implements RelationManager {
} );
GraphManager gm = managerCache.getGraphManager( applicationScope );
- Observable<Edge> edges = gm.loadEdgeVersions( CpNamingUtils
+ Observable<MarkedEdge> edges = gm.loadEdgeVersions( CpNamingUtils
.createEdgeFromConnectionType( new SimpleId( headEntity.getUuid(), headEntity.getType() ), connectionType,
entityId ) );
@@ -288,7 +284,7 @@ public class CpRelationManager implements RelationManager {
} );
GraphManager gm = managerCache.getGraphManager( applicationScope );
- Observable<Edge> edges = gm.loadEdgeVersions( CpNamingUtils
+ Observable<MarkedEdge> edges = gm.loadEdgeVersions( CpNamingUtils
.createEdgeFromCollectionName( new SimpleId( headEntity.getUuid(), headEntity.getType() ), collectionName,
entityId ) );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
index d3e0345..621edd2 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
@@ -23,13 +23,16 @@ package org.apache.usergrid.corepersistence.pipeline.read.traverse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter;
import org.apache.usergrid.corepersistence.pipeline.read.EdgePath;
import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.graph.GraphManager;
import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
import org.apache.usergrid.persistence.graph.SearchByEdgeType;
import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
import org.apache.usergrid.persistence.model.entity.Id;
@@ -42,18 +45,21 @@ import rx.Observable;
/**
* Command for reading graph edges
*/
-public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id, Edge> {
+public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id, MarkedEdge> {
private static final Logger logger = LoggerFactory.getLogger( AbstractReadGraphFilter.class );
private final GraphManagerFactory graphManagerFactory;
+ private final AsyncEventService asyncEventService;
/**
* Create a new instance of our command
*/
- public AbstractReadGraphFilter( final GraphManagerFactory graphManagerFactory ) {
+ public AbstractReadGraphFilter( final GraphManagerFactory graphManagerFactory,
+ final AsyncEventService asyncEventService ) {
this.graphManagerFactory = graphManagerFactory;
+ this.asyncEventService = asyncEventService;
}
@@ -61,9 +67,11 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
public Observable<FilterResult<Id>> call( final Observable<FilterResult<Id>> previousIds ) {
+ final ApplicationScope applicationScope = pipelineContext.getApplicationScope();
+
//get the graph manager
final GraphManager graphManager =
- graphManagerFactory.createEdgeManager( pipelineContext.getApplicationScope() );
+ graphManagerFactory.createEdgeManager( applicationScope );
final String edgeName = getEdgeTypeName();
@@ -74,18 +82,60 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
return previousIds.flatMap( previousFilterValue -> {
//set our our constant state
- final Optional<Edge> startFromCursor = getSeekValue();
+ final Optional<MarkedEdge> startFromCursor = getSeekValue();
final Id id = previousFilterValue.getValue();
+ final Optional<Edge> typeWrapper = Optional.fromNullable(startFromCursor.orNull());
+
+ /**
+ * We do not want to filter. This is intentional DO NOT REMOVE!!!
+ *
+ * We want to fire events on these edges if they exist, the delete was missed.
+ */
final SimpleSearchByEdgeType search =
new SimpleSearchByEdgeType( id, edgeName, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
- startFromCursor );
+ typeWrapper, false );
/**
* TODO, pass a message with pointers to our cursor values to be generated later
*/
- return graphManager.loadEdgesFromSource( search )
+ return graphManager.loadEdgesFromSource( search ).filter(markedEdge -> {
+
+ final boolean isDeleted = markedEdge.isDeleted();
+ final boolean isSourceNodeDeleted = markedEdge.isSourceNodeDelete();
+ final boolean isTargetNodeDelete = markedEdge.isTargetNodeDeleted();
+
+
+
+ if(isDeleted){
+ logger.trace( "Edge {} is deleted, queueing the delete event", markedEdge );
+ asyncEventService.queueDeleteEdge( applicationScope, markedEdge );
+ }
+
+ if(isSourceNodeDeleted){
+ final Id sourceNodeId = markedEdge.getSourceNode();
+
+ logger.trace( "Edge {} has a deleted source node, queueing the delete entity event for id {}", markedEdge, sourceNodeId );
+
+ asyncEventService.queueEntityDelete( applicationScope, sourceNodeId );
+ }
+
+ if(isTargetNodeDelete){
+
+ final Id targetNodeId = markedEdge.getTargetNode();
+
+ logger.trace( "Edge {} has a deleted target node, queueing the delete entity event for id {}", markedEdge, targetNodeId );
+
+ asyncEventService.queueEntityDelete( applicationScope, targetNodeId );
+ }
+
+
+ //filter if any of them are marked
+ return !isDeleted && !isSourceNodeDeleted && !isTargetNodeDelete;
+
+
+ })
//set the edge state for cursors
.doOnNext( edge -> {
logger.trace( "Seeking over edge {}", edge );
@@ -100,7 +150,7 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
@Override
- protected FilterResult<Id> createFilterResult( final Id emit, final Edge cursorValue,
+ protected FilterResult<Id> createFilterResult( final Id emit, final MarkedEdge cursorValue,
final Optional<EdgePath> parent ) {
//if it's our first pass, there's no cursor to generate
@@ -113,7 +163,7 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
@Override
- protected CursorSerializer<Edge> getCursorSerializer() {
+ protected CursorSerializer<MarkedEdge> getCursorSerializer() {
return EdgeCursorSerializer.INSTANCE;
}
@@ -131,14 +181,14 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
*/
private final class EdgeState {
- private Edge cursorEdge = null;
- private Edge currentEdge = null;
+ private MarkedEdge cursorEdge = null;
+ private MarkedEdge currentEdge = null;
/**
* Update the pointers
*/
- private void update( final Edge newEdge ) {
+ private void update( final MarkedEdge newEdge ) {
cursorEdge = currentEdge;
currentEdge = newEdge;
}
@@ -147,7 +197,7 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
/**
* Get the edge to use in cursors for resume
*/
- private Edge getCursorEdge() {
+ private MarkedEdge getCursorEdge() {
return cursorEdge;
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EdgeCursorSerializer.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EdgeCursorSerializer.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EdgeCursorSerializer.java
index 8d9bf6f..d54e547 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EdgeCursorSerializer.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EdgeCursorSerializer.java
@@ -22,20 +22,22 @@ package org.apache.usergrid.corepersistence.pipeline.read.traverse;
import org.apache.usergrid.corepersistence.pipeline.cursor.AbstractCursorSerializer;
import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
import org.apache.usergrid.persistence.graph.impl.SimpleEdge;
+import org.apache.usergrid.persistence.graph.impl.SimpleMarkedEdge;
/**
* Edge cursor serializer
*/
-public class EdgeCursorSerializer extends AbstractCursorSerializer<Edge> {
+public class EdgeCursorSerializer extends AbstractCursorSerializer<MarkedEdge> {
public static final EdgeCursorSerializer INSTANCE = new EdgeCursorSerializer();
@Override
- protected Class<SimpleEdge> getType() {
- return SimpleEdge.class;
+ protected Class<SimpleMarkedEdge> getType() {
+ return SimpleMarkedEdge.class;
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java
index dc39f5c..db5a0a8 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java
@@ -20,6 +20,7 @@
package org.apache.usergrid.corepersistence.pipeline.read.traverse;
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
import org.apache.usergrid.persistence.graph.GraphManagerFactory;
import com.google.inject.Inject;
@@ -40,8 +41,8 @@ public class ReadGraphCollectionFilter extends AbstractReadGraphFilter {
* Create a new instance of our command
*/
@Inject
- public ReadGraphCollectionFilter( final GraphManagerFactory graphManagerFactory, @Assisted final String collectionName ) {
- super( graphManagerFactory );
+ public ReadGraphCollectionFilter( final GraphManagerFactory graphManagerFactory, final AsyncEventService asyncEventService, @Assisted final String collectionName ) {
+ super( graphManagerFactory, asyncEventService );
this.collectionName = collectionName;
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionByTypeFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionByTypeFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionByTypeFilter.java
index 61ba4ad..054a52b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionByTypeFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionByTypeFilter.java
@@ -26,6 +26,7 @@ import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.graph.GraphManager;
import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
import org.apache.usergrid.persistence.graph.SearchByEdgeType;
import org.apache.usergrid.persistence.graph.impl.SimpleSearchByIdType;
import org.apache.usergrid.persistence.model.entity.Id;
@@ -42,7 +43,7 @@ import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getEdgeType
/**
* Command for reading graph edges on a connection
*/
-public class ReadGraphConnectionByTypeFilter extends AbstractPathFilter<Id, Id, Edge>{
+public class ReadGraphConnectionByTypeFilter extends AbstractPathFilter<Id, Id, MarkedEdge>{
private final GraphManagerFactory graphManagerFactory;
private final String connectionName;
@@ -77,12 +78,14 @@ public class ReadGraphConnectionByTypeFilter extends AbstractPathFilter<Id, Id,
return filterResultObservable.flatMap( idFilterResult -> {
//set our our constant state
- final Optional<Edge> startFromCursor = getSeekValue();
+ final Optional<MarkedEdge> startFromCursor = getSeekValue();
final Id id = idFilterResult.getValue();
+ final Optional<Edge> typeWrapper = Optional.fromNullable(startFromCursor.orNull());
+
final SimpleSearchByIdType search =
new SimpleSearchByIdType( id, edgeName, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
- entityType, startFromCursor );
+ entityType, typeWrapper );
return graphManager.loadEdgesFromSourceByType( search ).map(
edge -> createFilterResult( edge.getTargetNode(), edge, idFilterResult.getPath() ));
@@ -91,7 +94,7 @@ public class ReadGraphConnectionByTypeFilter extends AbstractPathFilter<Id, Id,
@Override
- protected CursorSerializer<Edge> getCursorSerializer() {
+ protected CursorSerializer<MarkedEdge> getCursorSerializer() {
return EdgeCursorSerializer.INSTANCE;
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java
index 11ec5f8..93e8fd4 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java
@@ -20,6 +20,7 @@
package org.apache.usergrid.corepersistence.pipeline.read.traverse;
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
import org.apache.usergrid.persistence.graph.GraphManagerFactory;
import com.google.inject.Inject;
@@ -40,8 +41,8 @@ public class ReadGraphConnectionFilter extends AbstractReadGraphFilter {
* Create a new instance of our command
*/
@Inject
- public ReadGraphConnectionFilter( final GraphManagerFactory graphManagerFactory, @Assisted final String connectionName ) {
- super( graphManagerFactory );
+ public ReadGraphConnectionFilter( final GraphManagerFactory graphManagerFactory, final AsyncEventService asyncEventService, @Assisted final String connectionName ) {
+ super( graphManagerFactory, asyncEventService );
this.connectionName = connectionName;
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java
index 2863cbf..74f9ce0 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.UUID;
import org.apache.usergrid.persistence.EntityManagerFactory;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
import org.apache.usergrid.persistence.index.*;
import org.junit.Before;
import org.junit.Rule;
@@ -135,7 +136,7 @@ public abstract class AsyncIndexServiceTest {
*/
- final List<Edge> connectionSearchEdges = Observable.range( 0, 500 ).flatMap(integer -> {
+ final List<MarkedEdge> connectionSearchEdges = Observable.range( 0, 500 ).flatMap(integer -> {
final Id connectingId = createId("connecting");
final Edge edge = CpNamingUtils.createConnectionEdge(connectingId, "likes", testEntity.getId());
http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java
index 6001dd4..90d6c5a 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.Set;
import java.util.UUID;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
import org.apache.usergrid.persistence.index.impl.IndexProducer;
import org.junit.Before;
import org.junit.Test;
@@ -244,7 +245,7 @@ public class IndexServiceTest {
// final int edgeCount = indexFig.getIndexBatchSize()*2;
final int edgeCount = 100;
- final List<Edge> connectionSearchEdges = Observable.range( 0, edgeCount ).flatMap( integer -> {
+ final List<MarkedEdge> connectionSearchEdges = Observable.range( 0, edgeCount ).flatMap( integer -> {
final Id connectingId = createId( "connecting" );
final Edge edge = CpNamingUtils.createConnectionEdge( connectingId, "likes", testEntity.getId() );
@@ -377,7 +378,8 @@ public class IndexServiceTest {
//Write multiple connection edges
final int edgeCount = 5;
- final List<Edge> connectionSearchEdges = createConnectionSearchEdges( testEntity, graphManager, edgeCount );
+ final List<MarkedEdge>
+ connectionSearchEdges = createConnectionSearchEdges( testEntity, graphManager, edgeCount );
indexService.indexEntity( applicationScope, testEntity ).flatMap(mesage -> indexProducer.put(mesage)).toBlocking().getIterator();
@@ -485,10 +487,10 @@ public class IndexServiceTest {
}
- private List<Edge> createConnectionSearchEdges(
- final Entity testEntity, final GraphManager graphManager, final int edgeCount ) {
+ private List<MarkedEdge> createConnectionSearchEdges( final Entity testEntity, final GraphManager graphManager,
+ final int edgeCount ) {
- final List<Edge> connectionSearchEdges = Observable.range( 0, edgeCount ).flatMap( integer -> {
+ final List<MarkedEdge> connectionSearchEdges = Observable.range( 0, edgeCount ).flatMap( integer -> {
//create our connection edge.
final Id connectingId = createId( "connecting" );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java
index 7128dcf..c9dcbf1 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java
@@ -28,7 +28,9 @@ import org.apache.usergrid.corepersistence.pipeline.read.EdgePath;
import org.apache.usergrid.corepersistence.pipeline.read.search.ElasticsearchCursorSerializer;
import org.apache.usergrid.corepersistence.pipeline.read.traverse.EdgeCursorSerializer;
import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
import org.apache.usergrid.persistence.graph.impl.SimpleEdge;
+import org.apache.usergrid.persistence.graph.impl.SimpleMarkedEdge;
import com.google.common.base.Optional;
@@ -40,19 +42,12 @@ public class CursorTest {
@Test
public void testCursors(){
+ //test encoding edge
+ final MarkedEdge edge1 = new SimpleMarkedEdge( createId("source1"), "edgeType1", createId("target1"), 100, false, false, false );
-
-
-
-
- //test encoding edge
-
- final Edge edge1 = new SimpleEdge( createId("source1"), "edgeType1", createId("target1"), 100 );
-
-
- final Edge edge2 = new SimpleEdge( createId("source2"), "edgeType2", createId("target2"), 110 );
+ final MarkedEdge edge2 = new SimpleMarkedEdge( createId("source2"), "edgeType2", createId("target2"), 110, false, false, false );
@@ -64,11 +59,12 @@ public class CursorTest {
final EdgePath<Integer> filter3Path = new EdgePath<>( 3, query2, ElasticsearchCursorSerializer.INSTANCE, Optional.absent() );
- final EdgePath<Edge> filter2Path = new EdgePath<Edge>(2, edge2, EdgeCursorSerializer.INSTANCE, Optional.of( filter3Path ));
+ final EdgePath<MarkedEdge> filter2Path =
+ new EdgePath<>( 2, edge2, EdgeCursorSerializer.INSTANCE, Optional.of( filter3Path ) );
final EdgePath<Integer> filter1Path = new EdgePath<>( 1, query1, ElasticsearchCursorSerializer.INSTANCE, Optional.of(filter2Path) );
- final EdgePath<Edge> filter0Path = new EdgePath<>( 0, edge1, EdgeCursorSerializer.INSTANCE, Optional.of( filter1Path ) );
+ final EdgePath<MarkedEdge> filter0Path = new EdgePath<>( 0, edge1, EdgeCursorSerializer.INSTANCE, Optional.of( filter1Path ) );
@@ -91,7 +87,7 @@ public class CursorTest {
assertEquals(query2, parsedQuery2);
- final Edge parsedEdge2 = requestCursor.getCursor( 2, EdgeCursorSerializer.INSTANCE );
+ final MarkedEdge parsedEdge2 = requestCursor.getCursor( 2, EdgeCursorSerializer.INSTANCE );
assertEquals( edge2, parsedEdge2 );
@@ -100,7 +96,7 @@ public class CursorTest {
assertEquals( query1, parsedQuery1 );
- final Edge parsedEdge1 = requestCursor.getCursor( 0, EdgeCursorSerializer.INSTANCE );
+ final MarkedEdge parsedEdge1 = requestCursor.getCursor( 0, EdgeCursorSerializer.INSTANCE );
assertEquals(edge1, parsedEdge1);
http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/core/src/test/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImplTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImplTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImplTest.java
index 326e128..6929d87 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImplTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImplTest.java
@@ -36,6 +36,7 @@ import org.apache.usergrid.persistence.core.test.UseModules;
import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.graph.GraphManager;
import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
import org.apache.usergrid.persistence.graph.SearchByEdge;
import org.apache.usergrid.persistence.graph.SearchByEdgeType;
import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
@@ -118,7 +119,7 @@ public class ConnectionServiceImplTest {
new SimpleSearchByEdge( source, connectionEdge.getType(), target, Long.MAX_VALUE,
SearchByEdgeType.Order.DESCENDING, Optional.absent() );
- final List<Edge> edges = gm.loadEdgeVersions( simpleSearchByEdge ).toList().toBlocking().last();
+ final List<MarkedEdge> edges = gm.loadEdgeVersions( simpleSearchByEdge ).toList().toBlocking().last();
assertEquals( 1, edges.size() );
@@ -209,7 +210,7 @@ public class ConnectionServiceImplTest {
SearchByEdgeType.Order.DESCENDING, Optional.absent() );
//check only 1 exists
- final List<Edge> edges = gm.loadEdgeVersions( simpleSearchByEdge ).toList().toBlocking().last();
+ final List<MarkedEdge> edges = gm.loadEdgeVersions( simpleSearchByEdge ).toList().toBlocking().last();
assertEquals( 1, edges.size() );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/core/src/test/java/org/apache/usergrid/persistence/ApplicationServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/ApplicationServiceIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/ApplicationServiceIT.java
index d870114..658d3eb 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/ApplicationServiceIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/ApplicationServiceIT.java
@@ -30,6 +30,7 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.graph.GraphManager;
import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
import org.apache.usergrid.persistence.graph.SearchByEdgeType;
import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
@@ -86,7 +87,8 @@ public class ApplicationServiceIT extends AbstractCoreIT {
, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
Optional.<Edge>absent() );
- Iterator<Edge> results = graphManager.loadEdgesFromSource(simpleSearchByEdgeType).toBlocking().getIterator();
+ Iterator<MarkedEdge>
+ results = graphManager.loadEdgesFromSource(simpleSearchByEdgeType).toBlocking().getIterator();
if(results.hasNext()){
Assert.fail("should be empty");
http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java
index 6100725..000c633 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java
@@ -59,7 +59,7 @@ public interface GraphManager extends CPManager {
* Create or update an edge. Note that the implementation should also create incoming (reversed) edges for this
* edge.
*/
- Observable<Edge> writeEdge( Edge edge );
+ Observable<MarkedEdge> writeEdge( Edge edge );
/**
@@ -68,7 +68,7 @@ public interface GraphManager extends CPManager {
*
* Implementation should also mark the incoming (reversed) edge. Only marks the specific version
*/
- Observable<Edge> markEdge( Edge edge );
+ Observable<MarkedEdge> markEdge( Edge edge );
/**
* @param edge Remove the edge in the graph
@@ -98,7 +98,7 @@ public interface GraphManager extends CPManager {
/**
* Get all versions of this edge where versions <= max version
*/
- Observable<Edge> loadEdgeVersions( SearchByEdge edge );
+ Observable<MarkedEdge> loadEdgeVersions( SearchByEdge edge );
/**
* Returns an observable that emits all edges where the specified node is the source node. The edges will match the
@@ -108,7 +108,7 @@ public interface GraphManager extends CPManager {
*
* @return An observable that emits Edges. The observer will need to unsubscribe when it has completed consumption.
*/
- Observable<Edge> loadEdgesFromSource( SearchByEdgeType search );
+ Observable<MarkedEdge> loadEdgesFromSource( SearchByEdgeType search );
/**
* Returns an observable that emits all edges where the specified node is the target node. The edges will match the
@@ -118,7 +118,7 @@ public interface GraphManager extends CPManager {
*
* @return An observable that emits Edges. The observer will need to unsubscribe when it has completed consumption.
*/
- Observable<Edge> loadEdgesToTarget( SearchByEdgeType search );
+ Observable<MarkedEdge> loadEdgesToTarget( SearchByEdgeType search );
/**
@@ -129,7 +129,7 @@ public interface GraphManager extends CPManager {
*
* @return An observable that emits Edges. The observer will need to unsubscribe when it has completed consumption.
*/
- Observable<Edge> loadEdgesFromSourceByType( SearchByIdType search );
+ Observable<MarkedEdge> loadEdgesFromSourceByType( SearchByIdType search );
/**
@@ -140,7 +140,7 @@ public interface GraphManager extends CPManager {
*
* @return An observable that emits Edges. The observer will need to unsubscribe when it has completed consumption.
*/
- Observable<Edge> loadEdgesToTargetByType( SearchByIdType search );
+ Observable<MarkedEdge> loadEdgesToTargetByType( SearchByIdType search );
/**
* Get all edge types to this node. The node provided by search is the target node.
http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/MarkedEdge.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/MarkedEdge.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/MarkedEdge.java
index da6fedb..88809e2 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/MarkedEdge.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/MarkedEdge.java
@@ -30,6 +30,7 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
* An edge. With the additional info of if it is marked for deletion
*
*/
+@JsonDeserialize(as = SimpleMarkedEdge.class)
public interface MarkedEdge extends Edge{
/**
http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
index 1bcb398..93ae753 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
@@ -144,12 +144,12 @@ public class GraphManagerImpl implements GraphManager {
@Override
- public Observable<Edge> writeEdge( final Edge edge ) {
+ public Observable<MarkedEdge> writeEdge( final Edge edge ) {
GraphValidation.validateEdge( edge );
final MarkedEdge markedEdge = new SimpleMarkedEdge( edge, false );
- final Observable<Edge> observable = Observable.just( markedEdge ).map( edge1 -> {
+ final Observable<MarkedEdge> observable = Observable.just( markedEdge ).map( edge1 -> {
final UUID timestamp = UUIDGenerator.newTimeUUID();
@@ -175,12 +175,12 @@ public class GraphManagerImpl implements GraphManager {
@Override
- public Observable<Edge> markEdge( final Edge edge ) {
+ public Observable<MarkedEdge> markEdge( final Edge edge ) {
GraphValidation.validateEdge( edge );
final MarkedEdge markedEdge = new SimpleMarkedEdge( edge, true );
- final Observable<Edge> observable = Observable.just( markedEdge ).map( edge1 -> {
+ final Observable<MarkedEdge> observable = Observable.just( markedEdge ).map( edge1 -> {
final UUID timestamp = UUIDGenerator.newTimeUUID();
@@ -282,9 +282,9 @@ public class GraphManagerImpl implements GraphManager {
@Override
- public Observable<Edge> loadEdgeVersions( final SearchByEdge searchByEdge ) {
+ public Observable<MarkedEdge> loadEdgeVersions( final SearchByEdge searchByEdge ) {
- final Observable<Edge> edges =
+ final Observable<MarkedEdge> edges =
Observable.create( new ObservableIterator<MarkedEdge>( "getEdgeTypesFromSource" ) {
@Override
protected Iterator<MarkedEdge> getIterator() {
@@ -298,8 +298,8 @@ public class GraphManagerImpl implements GraphManager {
@Override
- public Observable<Edge> loadEdgesFromSource( final SearchByEdgeType search ) {
- final Observable<Edge> edges =
+ public Observable<MarkedEdge> loadEdgesFromSource( final SearchByEdgeType search ) {
+ final Observable<MarkedEdge> edges =
Observable.create( new ObservableIterator<MarkedEdge>( "loadEdgesFromSource" ) {
@Override
protected Iterator<MarkedEdge> getIterator() {
@@ -313,8 +313,8 @@ public class GraphManagerImpl implements GraphManager {
@Override
- public Observable<Edge> loadEdgesToTarget( final SearchByEdgeType search ) {
- final Observable<Edge> edges =
+ public Observable<MarkedEdge> loadEdgesToTarget( final SearchByEdgeType search ) {
+ final Observable<MarkedEdge> edges =
Observable.create( new ObservableIterator<MarkedEdge>( "loadEdgesToTarget" ) {
@Override
protected Iterator<MarkedEdge> getIterator() {
@@ -329,8 +329,8 @@ public class GraphManagerImpl implements GraphManager {
@Override
- public Observable<Edge> loadEdgesFromSourceByType( final SearchByIdType search ) {
- final Observable<Edge> edges =
+ public Observable<MarkedEdge> loadEdgesFromSourceByType( final SearchByIdType search ) {
+ final Observable<MarkedEdge> edges =
Observable.create( new ObservableIterator<MarkedEdge>( "loadEdgesFromSourceByType" ) {
@Override
protected Iterator<MarkedEdge> getIterator() {
@@ -344,8 +344,8 @@ public class GraphManagerImpl implements GraphManager {
@Override
- public Observable<Edge> loadEdgesToTargetByType( final SearchByIdType search ) {
- final Observable<Edge> edges =
+ public Observable<MarkedEdge> loadEdgesToTargetByType( final SearchByIdType search ) {
+ final Observable<MarkedEdge> edges =
Observable.create( new ObservableIterator<MarkedEdge>( "loadEdgesToTargetByType" ) {
@Override
protected Iterator<MarkedEdge> getIterator() {
@@ -480,7 +480,7 @@ public class GraphManagerImpl implements GraphManager {
}
//if any one of these is true, we filter it
- return !(simpleMarkedEdge.isDeleted() || simpleMarkedEdge.isSourceNodeDelete() || simpleMarkedEdge.isTargetNodeDeleted());
+ return !simpleMarkedEdge.isDeleted() && !simpleMarkedEdge.isSourceNodeDelete() && !simpleMarkedEdge.isTargetNodeDeleted();
});
} );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleMarkedEdge.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleMarkedEdge.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleMarkedEdge.java
index c6dc2e4..9c35e2e 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleMarkedEdge.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleMarkedEdge.java
@@ -29,50 +29,61 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
/**
* Simple bean to represent our edge
+ *
* @author tnine
*/
-public class SimpleMarkedEdge extends SimpleEdge implements MarkedEdge {
+public class SimpleMarkedEdge extends SimpleEdge implements MarkedEdge {
+
+ private boolean isDeleted;
+ private boolean isSourceNodeDeleted;
+ private boolean isTargetNodeDeleted;
- private final boolean deleted;
- private final boolean isSourceNodeDeleted;
- private final boolean isTargetNodeDeleted;
+ /**
+ * Unused but required for Jackson
+ */
+ public SimpleMarkedEdge() {
+ }
- public SimpleMarkedEdge( final Id sourceNode, final String type, final Id targetNode, final long timestamp, final boolean deleted) {
- this( sourceNode, type, targetNode, timestamp, deleted, false, false );
+ public SimpleMarkedEdge( final Id sourceNode, final String type, final Id targetNode, final long timestamp,
+ final boolean isDeleted ) {
+
+ this( sourceNode, type, targetNode, timestamp, isDeleted, false, false );
}
public SimpleMarkedEdge( final Id sourceNode, final String type, final Id targetNode, final long timestamp,
- final boolean deleted, final boolean isSourceNodeDeleted,
+ final boolean isDeleted, final boolean isSourceNodeDeleted,
final boolean isTargetNodeDeleted ) {
super( sourceNode, type, targetNode, timestamp );
- this.deleted = deleted;
+ this.isDeleted = isDeleted;
this.isSourceNodeDeleted = isSourceNodeDeleted;
this.isTargetNodeDeleted = isTargetNodeDeleted;
}
- public SimpleMarkedEdge(final Edge edge, final boolean deleted){
- this(edge.getSourceNode(), edge.getType(), edge.getTargetNode(), edge.getTimestamp(), deleted);
+ public SimpleMarkedEdge( final Edge edge, final boolean isDeleted ) {
+ this( edge.getSourceNode(), edge.getType(), edge.getTargetNode(), edge.getTimestamp(), isDeleted );
}
@Override
@JsonIgnore
public boolean isDeleted() {
- return deleted;
+ return isDeleted;
}
@Override
+ @JsonIgnore
public boolean isSourceNodeDelete() {
return isSourceNodeDeleted;
}
@Override
+ @JsonIgnore
public boolean isTargetNodeDeleted() {
return isTargetNodeDeleted;
}
@@ -92,7 +103,7 @@ public class SimpleMarkedEdge extends SimpleEdge implements MarkedEdge {
final SimpleMarkedEdge that = ( SimpleMarkedEdge ) o;
- if ( deleted != that.deleted ) {
+ if ( isDeleted != that.isDeleted ) {
return false;
}
@@ -103,7 +114,7 @@ public class SimpleMarkedEdge extends SimpleEdge implements MarkedEdge {
@Override
public int hashCode() {
int result = super.hashCode();
- result = 31 * result + ( deleted ? 1 : 0 );
+ result = 31 * result + ( isDeleted ? 1 : 0 );
result = 31 * result + ( isSourceNodeDeleted ? 1 : 0 );
result = 31 * result + ( isTargetNodeDeleted ? 1 : 0 );
return result;
@@ -113,7 +124,7 @@ public class SimpleMarkedEdge extends SimpleEdge implements MarkedEdge {
@Override
public String toString() {
return "SimpleMarkedEdge{" +
- "deleted=" + deleted +
+ "deleted=" + isDeleted +
", isSourceNodeDeleted=" + isSourceNodeDeleted +
", isTargetNodeDeleted=" + isTargetNodeDeleted +
"} " + super.toString();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java
index eda3a02..9a95c26 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java
@@ -95,7 +95,7 @@ public class GraphManagerIT {
SearchByEdgeType search = createSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTimestamp(), null );
- Observable<Edge> edges = gm.loadEdgesFromSource( search );
+ Observable<MarkedEdge> edges = gm.loadEdgesFromSource( search );
//implicitly blows up if more than 1 is returned from "single"
Edge returned = edges.toBlocking().last();
@@ -127,7 +127,7 @@ public class GraphManagerIT {
SearchByEdgeType search = createSearchByEdge( edge.getTargetNode(), edge.getType(), edge.getTimestamp(), null );
- Observable<Edge> edges = gm.loadEdgesToTarget( search );
+ Observable<MarkedEdge> edges = gm.loadEdgesToTarget( search );
//implicitly blows up if more than 1 is returned from "single"
Edge returned = edges.toBlocking().single();
@@ -161,7 +161,7 @@ public class GraphManagerIT {
SearchByEdgeType search = createSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTimestamp(), null );
- Observable<Edge> edges = gm.loadEdgesFromSource( search );
+ Observable<MarkedEdge> edges = gm.loadEdgesFromSource( search );
//implicitly blows up if more than 1 is returned from "single"
Edge returned = edges.toBlocking().single();
@@ -196,7 +196,7 @@ public class GraphManagerIT {
SearchByEdgeType search = createSearchByEdge( edge.getTargetNode(), edge.getType(), edge.getTimestamp(), null );
- Observable<Edge> edges = gm.loadEdgesToTarget( search );
+ Observable<MarkedEdge> edges = gm.loadEdgesToTarget( search );
//implicitly blows up if more than 1 is returned from "single"
Edge returned = edges.toBlocking().single();
@@ -248,10 +248,10 @@ public class GraphManagerIT {
SearchByEdgeType search =
createSearchByEdge( edge1.getSourceNode(), edge1.getType(), edge3.getTimestamp(), null );
- Observable<Edge> edges = gm.loadEdgesFromSource( search );
+ Observable<MarkedEdge> edges = gm.loadEdgesFromSource( search );
//implicitly blows up if more than 1 is returned from "single"
- Iterator<Edge> returned = edges.toBlocking().getIterator();
+ Iterator<MarkedEdge> returned = edges.toBlocking().getIterator();
assertEquals( "Correct edge returned", edge3, returned.next() );
assertEquals( "Correct edge returned", edge2, returned.next() );
@@ -321,10 +321,10 @@ public class GraphManagerIT {
SearchByEdgeType search =
createSearchByEdge( edge1.getTargetNode(), edge1.getType(), edge3.getTimestamp(), null );
- Observable<Edge> edges = gm.loadEdgesToTarget( search );
+ Observable<MarkedEdge> edges = gm.loadEdgesToTarget( search );
//implicitly blows up if more than 1 is returned from "single"
- Iterator<Edge> returned = edges.toBlocking().getIterator();
+ Iterator<MarkedEdge> returned = edges.toBlocking().getIterator();
assertEquals( "Correct edge returned", edge3, returned.next() );
assertEquals( "Correct edge returned", edge2, returned.next() );
@@ -387,10 +387,10 @@ public class GraphManagerIT {
SearchByEdgeType search =
createSearchByEdge( edge1.getSourceNode(), edge1.getType(), edge3.getTimestamp(), null );
- Observable<Edge> edges = gm.loadEdgesFromSource( search );
+ Observable<MarkedEdge> edges = gm.loadEdgesFromSource( search );
//implicitly blows up if more than 1 is returned from "single"
- Iterator<Edge> returned = edges.toBlocking().getIterator();
+ Iterator<MarkedEdge> returned = edges.toBlocking().getIterator();
//we have 3 edges, but we specified our first edge as the max, we shouldn't get any more results than the first
@@ -443,10 +443,10 @@ public class GraphManagerIT {
SearchByEdgeType search =
createSearchByEdge( edge1.getTargetNode(), edge1.getType(), edge3.getTimestamp(), null );
- Observable<Edge> edges = gm.loadEdgesToTarget( search );
+ Observable<MarkedEdge> edges = gm.loadEdgesToTarget( search );
//implicitly blows up if more than 1 is returned from "single"
- Iterator<Edge> returned = edges.toBlocking().getIterator();
+ Iterator<MarkedEdge> returned = edges.toBlocking().getIterator();
//we have 3 edges, but we specified our first edge as the max, we shouldn't get any more results than the first
@@ -487,7 +487,7 @@ public class GraphManagerIT {
SearchByIdType search = createSearchByEdgeAndId( edge.getSourceNode(), edge.getType(), edge.getTimestamp(),
edge.getTargetNode().getType(), null );
- Observable<Edge> edges = gm.loadEdgesFromSourceByType( search );
+ Observable<MarkedEdge> edges = gm.loadEdgesFromSourceByType( search );
//implicitly blows up if more than 1 is returned from "single"
Edge returned = edges.toBlocking().single();
@@ -524,7 +524,7 @@ public class GraphManagerIT {
SearchByIdType search = createSearchByEdgeAndId( edge.getTargetNode(), edge.getType(), edge.getTimestamp(),
edge.getSourceNode().getType(), null );
- Observable<Edge> edges = gm.loadEdgesToTargetByType( search );
+ Observable<MarkedEdge> edges = gm.loadEdgesToTargetByType( search );
//implicitly blows up if more than 1 is returned from "single"
Edge returned = edges.toBlocking().single();
@@ -560,7 +560,7 @@ public class GraphManagerIT {
SearchByEdgeType search = createSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTimestamp(), null );
- Observable<Edge> edges = gm.loadEdgesFromSource( search );
+ Observable<MarkedEdge> edges = gm.loadEdgesFromSource( search );
//implicitly blows up if more than 1 is returned from "single"
Edge returned = edges.toBlocking().single();
@@ -629,7 +629,7 @@ public class GraphManagerIT {
SearchByEdgeType search = createSearchByEdge( edge.getTargetNode(), edge.getType(), edge.getTimestamp(), null );
- Observable<Edge> edges = gm.loadEdgesToTarget( search );
+ Observable<MarkedEdge> edges = gm.loadEdgesToTarget( search );
//implicitly blows up if more than 1 is returned from "single"
Edge returned = edges.toBlocking().single();
@@ -988,11 +988,11 @@ public class GraphManagerIT {
//get our 2 edges
- Observable<Edge> edges =
+ Observable<MarkedEdge> edges =
gm.loadEdgesFromSource( createSearchByEdge( edge1.getSourceNode(), edge1.getType(), maxVersion, null ) );
- Iterator<Edge> results = edges.toBlocking().getIterator();
+ Iterator<MarkedEdge> results = edges.toBlocking().getIterator();
assertEquals( "Edges correct", edge2, results.next() );
@@ -1060,11 +1060,11 @@ public class GraphManagerIT {
//get our 2 edges
- Observable<Edge> edges =
+ Observable<MarkedEdge> edges =
gm.loadEdgesToTarget( createSearchByEdge( edge1.getTargetNode(), edge1.getType(), maxVersion, null ) );
- Iterator<Edge> results = edges.toBlocking().getIterator();
+ Iterator<MarkedEdge> results = edges.toBlocking().getIterator();
assertEquals( "Edges correct", edge2, results.next() );
@@ -1140,11 +1140,11 @@ public class GraphManagerIT {
//get our 2 edges
- Observable<Edge> edges = gm.loadEdgesFromSource(
+ Observable<MarkedEdge> edges = gm.loadEdgesFromSource(
createSearchByEdgeUnfiltered( edge1.getSourceNode(), edge1.getType(), maxVersion, null ) );
- Iterator<Edge> results = edges.toBlocking().getIterator();
+ Iterator<MarkedEdge> results = edges.toBlocking().getIterator();
assertEquals( "Edges correct", edge2.getTargetNode(), results.next().getTargetNode() );
@@ -1211,11 +1211,11 @@ public class GraphManagerIT {
gm.markEdge( edge1 ).toBlocking().last();
//get our 2 edges
- Observable<Edge> edges = gm.loadEdgesToTarget(
+ Observable<MarkedEdge> edges = gm.loadEdgesToTarget(
createSearchByEdgeUnfiltered( edge1.getTargetNode(), edge1.getType(), maxVersion, null ) );
- Iterator<Edge> results = edges.toBlocking().getIterator();
+ Iterator<MarkedEdge> results = edges.toBlocking().getIterator();
assertEquals( "Edges correct", edge2.getSourceNode(), results.next().getSourceNode() );
@@ -1282,11 +1282,11 @@ public class GraphManagerIT {
//get our 2 edges
- Observable<Edge> edges = gm.loadEdgesFromSourceByType(
+ Observable<MarkedEdge> edges = gm.loadEdgesFromSourceByType(
createSearchByEdgeAndId( sourceId, edge1.getType(), maxVersion, targetId1.getType(), null ) );
- Iterator<Edge> results = edges.toBlocking().getIterator();
+ Iterator<MarkedEdge> results = edges.toBlocking().getIterator();
assertEquals( "Edges correct", edge1, results.next() );
@@ -1359,11 +1359,11 @@ public class GraphManagerIT {
final long maxVersion = System.currentTimeMillis();
//get our 2 edges
- Observable<Edge> edges = gm.loadEdgesToTargetByType(
+ Observable<MarkedEdge> edges = gm.loadEdgesToTargetByType(
createSearchByEdgeAndId( targetId, edge1.getType(), maxVersion, sourceId1.getType(), null ) );
- Iterator<Edge> results = edges.toBlocking().getIterator();
+ Iterator<MarkedEdge> results = edges.toBlocking().getIterator();
assertEquals( "Edges correct", edge1, results.next() );
@@ -1435,7 +1435,7 @@ public class GraphManagerIT {
final long maxVersion = System.currentTimeMillis();
- Iterator<Edge> results =
+ Iterator<MarkedEdge> results =
gm.loadEdgesFromSource( createSearchByEdge( sourceId, edge1.getType(), maxVersion, null ) ).toBlocking()
.getIterator();
@@ -1547,7 +1547,7 @@ public class GraphManagerIT {
final long maxVersion = System.currentTimeMillis();
- Iterator<Edge> results =
+ Iterator<MarkedEdge> results =
gm.loadEdgesFromSource( createSearchByEdge( sourceId, edge1.getType(), maxVersion, null ) ).toBlocking()
.getIterator();
@@ -1627,7 +1627,7 @@ public class GraphManagerIT {
final long maxVersion = System.currentTimeMillis();
- Iterator<Edge> results =
+ Iterator<MarkedEdge> results =
gm.loadEdgesToTarget( createSearchByEdge( targetId, edge1.getType(), maxVersion, null ) ).toBlocking()
.getIterator();
@@ -1764,7 +1764,7 @@ public class GraphManagerIT {
final long maxVersion = System.currentTimeMillis();
- Iterator<Edge> results =
+ Iterator<MarkedEdge> results =
gm.loadEdgesToTarget( createSearchByEdge( targetId, edge1.getType(), maxVersion, null ) ).toBlocking()
.getIterator();
@@ -1842,7 +1842,7 @@ public class GraphManagerIT {
final long maxVersion = System.currentTimeMillis();
- Iterator<Edge> results =
+ Iterator<MarkedEdge> results =
gm.loadEdgesFromSource( createSearchByEdge( sourceId, edge1.getType(), maxVersion, null ) ).toBlocking()
.getIterator();
@@ -1983,7 +1983,7 @@ public class GraphManagerIT {
final long maxVersion = System.currentTimeMillis();
- Iterator<Edge> results =
+ Iterator<MarkedEdge> results =
gm.loadEdgesToTarget( createSearchByEdge( targetId, edge1.getType(), maxVersion, null ) ).toBlocking()
.getIterator();
@@ -2356,10 +2356,10 @@ public class GraphManagerIT {
new SimpleSearchByEdge( edge1.getSourceNode(), edge1.getType(), edge1.getTargetNode(), Long.MAX_VALUE,
SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent() );
- final Observable<Edge> edgesDescending = gm.loadEdgeVersions( searchDescending );
+ final Observable<MarkedEdge> edgesDescending = gm.loadEdgeVersions( searchDescending );
//search descending
- final List<Edge> descending = edgesDescending.toList().toBlocking().single();
+ final List<MarkedEdge> descending = edgesDescending.toList().toBlocking().single();
assertEquals( "Correct size returned", 3, descending.size() );
@@ -2376,9 +2376,9 @@ public class GraphManagerIT {
new SimpleSearchByEdge( edge1.getSourceNode(), edge1.getType(), edge1.getTargetNode(), 0,
SearchByEdgeType.Order.ASCENDING, Optional.<Edge>absent() );
- Observable<Edge> edgesAscending = gm.loadEdgeVersions( searchAscending );
+ Observable<MarkedEdge> edgesAscending = gm.loadEdgeVersions( searchAscending );
- List<Edge> ascending = edgesAscending.toList().toBlocking().single();
+ List<MarkedEdge> ascending = edgesAscending.toList().toBlocking().single();
assertEquals( "Correct size returned", 3, ascending.size() );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerLoadTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerLoadTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerLoadTest.java
index b922e7c..22683f6 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerLoadTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerLoadTest.java
@@ -113,7 +113,7 @@ public class GraphManagerLoadTest {
@Override
- public Observable<Edge> doSearch( final GraphManager manager ) {
+ public Observable<MarkedEdge> doSearch( final GraphManager manager ) {
return manager.loadEdgesFromSource( new SimpleSearchByEdgeType( sourceId, "test", System.currentTimeMillis(), SearchByEdgeType.Order.DESCENDING, Optional
.<Edge>absent()) );
}
@@ -141,7 +141,7 @@ public class GraphManagerLoadTest {
@Override
- public Observable<Edge> doSearch( final GraphManager manager ) {
+ public Observable<MarkedEdge> doSearch( final GraphManager manager ) {
return manager.loadEdgesToTarget( new SimpleSearchByEdgeType( targetId, "test", System.currentTimeMillis(), SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent() ) );
}
};
@@ -220,7 +220,7 @@ public class GraphManagerLoadTest {
final CountDownLatch latch = new CountDownLatch( 1 );
- generator.doSearch( manager ).take( readCount ).buffer( 1000 ).subscribe( new Subscriber<List<Edge>>() {
+ generator.doSearch( manager ).take( readCount ).buffer( 1000 ).subscribe( new Subscriber<List<MarkedEdge>>() {
@Override
public void onCompleted() {
timer.stop();
@@ -235,7 +235,7 @@ public class GraphManagerLoadTest {
@Override
- public void onNext( final List<Edge> edges ) {
+ public void onNext( final List<MarkedEdge> edges ) {
log.info("Read {} edges", edges.size());
}
} );
@@ -263,6 +263,6 @@ public class GraphManagerLoadTest {
* @param manager
* @return
*/
- public Observable<Edge> doSearch( final GraphManager manager );
+ public Observable<MarkedEdge> doSearch( final GraphManager manager );
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
index c1917bb..6aad289 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
@@ -176,7 +176,7 @@ public class GraphManagerShardConsistencyIT {
@Override
- public Observable<Edge> doSearch( final GraphManager manager ) {
+ public Observable<MarkedEdge> doSearch( final GraphManager manager ) {
return manager.loadEdgesFromSource(
new SimpleSearchByEdgeType( sourceId, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
Optional.<Edge>absent() ) );
@@ -409,7 +409,7 @@ public class GraphManagerShardConsistencyIT {
@Override
- public Observable<Edge> doSearch( final GraphManager manager ) {
+ public Observable<MarkedEdge> doSearch( final GraphManager manager ) {
return manager.loadEdgesFromSource(
new SimpleSearchByEdgeType( sourceId, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
Optional.<Edge>absent(), false ) );
@@ -729,7 +729,7 @@ public class GraphManagerShardConsistencyIT {
/**
* Perform the search returning an observable edge
*/
- public Observable<Edge> doSearch( final GraphManager manager );
+ public Observable<MarkedEdge> doSearch( final GraphManager manager );
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerStressTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerStressTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerStressTest.java
index 2889684..6a2efc9 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerStressTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerStressTest.java
@@ -107,25 +107,25 @@ public class GraphManagerStressTest {
@Override
- public Observable<Edge> doSearch( final GraphManager manager ) {
+ public Observable<MarkedEdge> doSearch( final GraphManager manager ) {
final long timestamp = System.currentTimeMillis();
- return Observable.create( new Observable.OnSubscribe<Edge>() {
+ return Observable.create( new Observable.OnSubscribe<MarkedEdge>() {
@Override
- public void call( final Subscriber<? super Edge> subscriber ) {
+ public void call( final Subscriber<? super MarkedEdge> subscriber ) {
try {
for ( Id sourceId : sourceIds ) {
- final Iterable<Edge> edges = manager.loadEdgesFromSource(
+ final Iterable<MarkedEdge> edges = manager.loadEdgesFromSource(
new SimpleSearchByEdgeType( sourceId, "test", timestamp, SearchByEdgeType.Order.DESCENDING, Optional
.<Edge>absent() ) )
.toBlocking().toIterable();
- for ( Edge edge : edges ) {
+ for ( MarkedEdge edge : edges ) {
log.debug( "Firing on next for edge {}", edge );
subscriber.onNext( edge );
@@ -195,7 +195,7 @@ public class GraphManagerStressTest {
@Override
- public Observable<Edge> doSearch( final GraphManager manager ) {
+ public Observable<MarkedEdge> doSearch( final GraphManager manager ) {
return manager.loadEdgesFromSource( new SimpleSearchByEdgeType( sourceId, "test", System.currentTimeMillis(), SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent() ) );
}
};
@@ -222,7 +222,7 @@ public class GraphManagerStressTest {
@Override
- public Observable<Edge> doSearch( final GraphManager manager ) {
+ public Observable<MarkedEdge> doSearch( final GraphManager manager ) {
return manager.loadEdgesToTarget( new SimpleSearchByEdgeType( targetId, "test", System.currentTimeMillis(), SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent() ) );
}
@@ -307,6 +307,6 @@ public class GraphManagerStressTest {
*/
public Edge newEdge();
- public Observable<Edge> doSearch( final GraphManager manager );
+ public Observable<MarkedEdge> doSearch( final GraphManager manager );
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/services/src/main/java/org/apache/usergrid/management/AppInfoMigrationPlugin.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/AppInfoMigrationPlugin.java b/stack/services/src/main/java/org/apache/usergrid/management/AppInfoMigrationPlugin.java
index d9d3d0d..6e84601 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/AppInfoMigrationPlugin.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/AppInfoMigrationPlugin.java
@@ -47,6 +47,7 @@ import org.apache.usergrid.persistence.exceptions.ApplicationAlreadyExistsExcept
import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.graph.GraphManager;
import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
import org.apache.usergrid.persistence.graph.SearchByEdgeType;
import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
@@ -239,7 +240,7 @@ public class AppInfoMigrationPlugin implements MigrationPlugin {
final EntityCollectionManager managementCollectionManager =
entityCollectionManagerFactory.createCollectionManager( managementAppScope );
- Observable<Edge> edgesObservable = getApplicationInfoEdges( appId );
+ Observable<MarkedEdge> edgesObservable = getApplicationInfoEdges( appId );
//get the graph for all app infos
Observable<org.apache.usergrid.persistence.model.entity.Entity> entityObs = edgesObservable.flatMap( edge -> {
final Id appInfoId = edge.getTargetNode();
@@ -299,7 +300,7 @@ public class AppInfoMigrationPlugin implements MigrationPlugin {
}
- public Observable<Edge> getApplicationInfoEdges( final UUID applicationId ) {
+ public Observable<MarkedEdge> getApplicationInfoEdges( final UUID applicationId ) {
final ApplicationScope managementAppScope = getApplicationScope( CpNamingUtils.MANAGEMENT_APPLICATION_ID );
final GraphManager gm = graphManagerFactory.createEdgeManager( managementAppScope );