You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/03/27 22:21:28 UTC
[14/50] [abbrv] Merged hystrix into asyncqueue
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/136edaba/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeSerialization.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeSerialization.java
index 548d821..9344e70 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeSerialization.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeSerialization.java
@@ -65,12 +65,12 @@ public interface EdgeSerialization {
/**
- * Search for specific versions of the edge from source->target. Will return all versions.
+ * Search for all versions of this edge < the search version. Returns all versions
* @param scope
* @param search
* @return
*/
- Iterator<MarkedEdge> getEdgeFromSource( OrganizationScope scope, SearchByEdge search );
+ Iterator<MarkedEdge> getEdgeVersions( OrganizationScope scope, SearchByEdge search );
/**
* Get an iterator of all edges by edge type originating from source node
@@ -97,14 +97,6 @@ public interface EdgeSerialization {
*/
Iterator<MarkedEdge> getEdgesToTarget( OrganizationScope scope, SearchByEdgeType edgeType );
- /**
- * Search for specific versions of the edge from source->target. Will return all versions
- * @param scope
- * @param search
- * @return
- */
- Iterator<MarkedEdge> getEdgeToTarget( OrganizationScope scope, SearchByEdge search );
-
/**
* Get an iterator of all edges by edge type pointing to the target node. Also uses the source id type to limit the
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/136edaba/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
index 7e11206..d255929 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
@@ -85,6 +85,8 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
//row key with target id type
private static final RowTypeSerializer ROW_TYPE_SERIALIZER = new RowTypeSerializer();
+ private static final EdgeRowKeySerializer EDGE_ROW_KEY_SERIALIZER = new EdgeRowKeySerializer();
+
//Edge serializers
private static final EdgeSerializer EDGE_SERIALIZER = new EdgeSerializer();
@@ -124,6 +126,15 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
new OrganizationScopedRowKeySerializer<RowKeyType>( ROW_TYPE_SERIALIZER ), EDGE_SERIALIZER );
+ /**
+ * Get all graph edge versions
+ */
+ private static final MultiTennantColumnFamily<OrganizationScope, EdgeRowKey, UUID> GRAPH_EDGE_VERSIONS =
+ new MultiTennantColumnFamily<OrganizationScope, EdgeRowKey, UUID>( "Graph_Edge_Versions",
+ new OrganizationScopedRowKeySerializer<EdgeRowKey>( EDGE_ROW_KEY_SERIALIZER ),
+ UUIDSerializer.get() );
+
+
protected final Keyspace keyspace;
protected final CassandraConfig cassandraConfig;
protected final GraphFig graphFig;
@@ -140,16 +151,23 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
@Override
public MutationBatch writeEdge( final OrganizationScope scope, final Edge edge ) {
- final MutationBatch batch = keyspace.prepareMutationBatch().withConsistencyLevel( cassandraConfig.getWriteCL() );;
+ final MutationBatch batch = keyspace.prepareMutationBatch().withConsistencyLevel( cassandraConfig.getWriteCL());
- doWrite( scope, edge, new RowOp() {
+ doWrite( scope, edge, new RowOp<RowKey>() {
@Override
- public void doWrite( final MultiTennantColumnFamily columnFamily, final Object rowKey,
- final DirectedEdge edge ) {
-
+ public <R extends RowKey> void doWrite(
+ final MultiTennantColumnFamily<OrganizationScope, R, DirectedEdge> columnFamily, final R rowKey,
+ final DirectedEdge edge ) {
batch.withRow( columnFamily, ScopedRowKey.fromKey( scope, rowKey ) ).putColumn( edge, false );
}
+
+
+ @Override
+ public void doWrite( final MultiTennantColumnFamily<OrganizationScope, EdgeRowKey, UUID> columnFamily,
+ final EdgeRowKey rowKey, final UUID version ) {
+ batch.withRow( columnFamily, ScopedRowKey.fromKey( scope, rowKey ) ).putColumn( version, false );
+ }
} );
@@ -159,16 +177,23 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
@Override
public MutationBatch markEdge( final OrganizationScope scope, final Edge edge ) {
- final MutationBatch batch = keyspace.prepareMutationBatch().withConsistencyLevel( cassandraConfig.getWriteCL() );;
+ final MutationBatch batch = keyspace.prepareMutationBatch().withConsistencyLevel( cassandraConfig.getWriteCL() );
- doWrite( scope, edge, new RowOp() {
+ doWrite( scope, edge, new RowOp<RowKey>() {
@Override
- public void doWrite( final MultiTennantColumnFamily columnFamily, final Object rowKey,
- final DirectedEdge edge ) {
-
+ public <R extends RowKey> void doWrite(
+ final MultiTennantColumnFamily<OrganizationScope, R, DirectedEdge> columnFamily, final R rowKey,
+ final DirectedEdge edge ) {
batch.withRow( columnFamily, ScopedRowKey.fromKey( scope, rowKey ) ).putColumn( edge, true );
}
+
+
+ @Override
+ public void doWrite( final MultiTennantColumnFamily<OrganizationScope, EdgeRowKey, UUID> columnFamily,
+ final EdgeRowKey rowKey, final UUID version ) {
+ batch.withRow( columnFamily, ScopedRowKey.fromKey( scope, rowKey ) ).putColumn( version, true );
+ }
} );
@@ -181,13 +206,20 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
final MutationBatch batch = keyspace.prepareMutationBatch().withConsistencyLevel( cassandraConfig.getWriteCL() );
- doWrite( scope, edge, new RowOp() {
+ doWrite( scope, edge, new RowOp<RowKey>() {
@Override
- public void doWrite( final MultiTennantColumnFamily columnFamily, final Object rowKey,
- final DirectedEdge edge ) {
-
+ public <R extends RowKey> void doWrite(
+ final MultiTennantColumnFamily<OrganizationScope, R, DirectedEdge> columnFamily, final R rowKey,
+ final DirectedEdge edge ) {
batch.withRow( columnFamily, ScopedRowKey.fromKey( scope, rowKey ) ).deleteColumn( edge );
}
+
+
+ @Override
+ public void doWrite( final MultiTennantColumnFamily<OrganizationScope, EdgeRowKey, UUID> columnFamily,
+ final EdgeRowKey rowKey, final UUID version ) {
+ batch.withRow( columnFamily, ScopedRowKey.fromKey( scope, rowKey ) ).deleteColumn( version );
+ }
} );
@@ -197,7 +229,8 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
/**
* EdgeWrite the edges internally
- * @param scope The scope to encapsulate
+ *
+ * @param scope The scope to encapsulate
* @param edge The edge to write
* @param op The row operation to invoke
*/
@@ -229,23 +262,35 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
final DirectedEdge targetEdge = new DirectedEdge( sourceNodeId, version );
+ final EdgeRowKey edgeRowKey = new EdgeRowKey( sourceNodeId, type, targetNodeId );
+
+
/**
- * EdgeWrite edges from target<-source
+ * write edges from source->target
*/
-
op.doWrite( GRAPH_SOURCE_NODE_EDGES, sourceRowKey, sourceEdge );
op.doWrite( GRAPH_SOURCE_NODE_TARGET_TYPE, sourceRowKeyType, sourceEdge );
+
+ /**
+ * write edges from target<-source
+ */
op.doWrite( GRAPH_TARGET_NODE_EDGES, targetRowKey, targetEdge );
op.doWrite( GRAPH_TARGET_NODE_SOURCE_TYPE, targetRowKeyType, targetEdge );
+
+
+ /**
+ * Write this in the version log for this edge of source->target
+ */
+ op.doWrite( GRAPH_EDGE_VERSIONS, edgeRowKey, version );
}
@Override
- public Iterator<MarkedEdge> getEdgeFromSource( final OrganizationScope scope, final SearchByEdge search ) {
+ public Iterator<MarkedEdge> getEdgeVersions( final OrganizationScope scope, final SearchByEdge search ) {
ValidationUtils.validateOrganizationScope( scope );
EdgeUtils.validateSearchByEdge( search );
@@ -387,52 +432,6 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
@Override
- public Iterator<MarkedEdge> getEdgeToTarget( final OrganizationScope scope, final SearchByEdge search ) {
- ValidationUtils.validateOrganizationScope( scope );
- EdgeUtils.validateSearchByEdge( search );
-
- final Id sourceId = search.sourceNode();
- final Id targetId = search.targetNode();
- final UUID maxVersion = search.getMaxVersion();
- final String type = search.getType();
-
- return getEdges( GRAPH_TARGET_NODE_EDGES, new EdgeSearcher<RowKey>( scope, search.last() ) {
-
-
- @Override
- public void setRange( final RangeBuilder builder ) {
- if ( last.isPresent() ) {
- super.setRange( builder );
- return;
- }
-
- //set the last value in the range based on the max version
- final DirectedEdge colValue = new DirectedEdge( sourceId, maxVersion );
- builder.setStart( colValue, EDGE_SERIALIZER );
- }
-
-
- @Override
- protected RowKey generateRowKey() {
- return new RowKey( targetId, type );
- }
-
-
- @Override
- protected DirectedEdge getStartColumn( final Edge last ) {
- return new DirectedEdge( last.getSourceNode(), last.getVersion() );
- }
-
-
- @Override
- protected MarkedEdge createEdge( final DirectedEdge edge, final boolean marked ) {
- return new SimpleMarkedEdge( edge.id, type, targetId, edge.version, marked );
- }
- } );
- }
-
-
- @Override
public Iterator<MarkedEdge> getEdgesToTargetBySourceType( final OrganizationScope scope,
final SearchByIdType edgeType ) {
@@ -490,20 +489,21 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
RowQuery<ScopedRowKey<OrganizationScope, R>, DirectedEdge> query =
- keyspace.prepareQuery( cf ).setConsistencyLevel( cassandraConfig.getReadCL() ).getKey( rowKey ).autoPaginate( true )
- .withColumnRange( rangeBuilder.build() );
-
+ keyspace.prepareQuery( cf ).setConsistencyLevel( cassandraConfig.getReadCL() ).getKey( rowKey )
+ .autoPaginate( true ).withColumnRange( rangeBuilder.build() );
- return new ColumnNameIterator<DirectedEdge, MarkedEdge>( query, searcher,
- searcher.hasPage(), graphFig.getReadTimeout() );
+ return new ColumnNameIterator<DirectedEdge, MarkedEdge>( query, searcher, searcher.hasPage() );
}
@Override
public Collection<MultiTennantColumnFamilyDefinition> getColumnFamilies() {
return Arrays.asList( graphCf( GRAPH_SOURCE_NODE_EDGES ), graphCf( GRAPH_TARGET_NODE_EDGES ),
- graphCf( GRAPH_SOURCE_NODE_TARGET_TYPE ), graphCf( GRAPH_TARGET_NODE_SOURCE_TYPE ) );
+ graphCf( GRAPH_SOURCE_NODE_TARGET_TYPE ), graphCf( GRAPH_TARGET_NODE_SOURCE_TYPE ),
+ new MultiTennantColumnFamilyDefinition( GRAPH_EDGE_VERSIONS, BytesType.class.getSimpleName(),
+ ColumnTypes.UUID_TYPE_REVERSED, BytesType.class.getSimpleName(),
+ MultiTennantColumnFamilyDefinition.CacheOption.KEYS ) );
}
@@ -511,8 +511,9 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
* Helper to generate an edge definition by the type
*/
private MultiTennantColumnFamilyDefinition graphCf( MultiTennantColumnFamily cf ) {
- return new MultiTennantColumnFamilyDefinition( cf,
- BytesType.class.getSimpleName(), ColumnTypes.DYNAMIC_COMPOSITE_TYPE, BytesType.class.getSimpleName() , MultiTennantColumnFamilyDefinition.CacheOption.KEYS);
+ return new MultiTennantColumnFamilyDefinition( cf, BytesType.class.getSimpleName(),
+ ColumnTypes.DYNAMIC_COMPOSITE_TYPE, BytesType.class.getSimpleName(),
+ MultiTennantColumnFamilyDefinition.CacheOption.KEYS );
}
@@ -558,11 +559,12 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
Preconditions.checkArgument( composite.size() == 3, "Composite should have 3 elements" );
- //parse our id
- final Id id = ID_COL_SERIALIZER.fromComposite( composite, 0 );
-
//return the version
- final UUID version = composite.get( 2, UUID_SERIALIZER );
+ final UUID version = composite.get( 0, UUID_SERIALIZER );
+
+
+ //parse our id
+ final Id id = ID_COL_SERIALIZER.fromComposite( composite, 1 );
return new DirectedEdge( id, version );
@@ -575,11 +577,13 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
private DynamicComposite createComposite( DirectedEdge edge, AbstractComposite.ComponentEquality equality ) {
DynamicComposite composite = new DynamicComposite();
- ID_COL_SERIALIZER.toComposite( composite, edge.id );
-
//add our edge
composite.addComponent( edge.version, UUID_SERIALIZER, ColumnTypes.UUID_TYPE_REVERSED, equality );
+
+ ID_COL_SERIALIZER.toComposite( composite, edge.id );
+
+
return composite;
}
}
@@ -646,6 +650,23 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
/**
+ * Used to store row keys by sourceId, targetId and edgeType
+ */
+ private static class EdgeRowKey {
+ public final Id sourceId;
+ public final Id targetId;
+ public final String edgeType;
+
+
+ private EdgeRowKey( final Id sourceId, final String edgeType, final Id targetId ) {
+ this.sourceId = sourceId;
+ this.targetId = targetId;
+ this.edgeType = edgeType;
+ }
+ }
+
+
+ /**
* Searcher to be used when performing the search. Performs I/O transformation as well as parsing for the iterator
*/
private static abstract class EdgeSearcher<R> implements ColumnParser<DirectedEdge, MarkedEdge> {
@@ -779,11 +800,51 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
/**
+ * Class to perform serialization for row keys from edges
+ */
+ private static class EdgeRowKeySerializer implements CompositeFieldSerializer<EdgeRowKey> {
+
+ private static final IdRowCompositeSerializer ID_SER = IdRowCompositeSerializer.get();
+
+
+ @Override
+ public void toComposite( final CompositeBuilder builder, final EdgeRowKey key ) {
+
+ //add the row id to the composite
+ ID_SER.toComposite( builder, key.sourceId );
+ builder.addString( key.edgeType );
+ ID_SER.toComposite( builder, key.targetId );
+ }
+
+
+ @Override
+ public EdgeRowKey fromComposite( final CompositeParser composite ) {
+
+ final Id sourceId = ID_SER.fromComposite( composite );
+ final String edgeType = composite.readString();
+ final Id targetId = ID_SER.fromComposite( composite );
+
+ return new EdgeRowKey( sourceId, edgeType, targetId );
+ }
+ }
+
+
+ /**
* Simple callback to perform puts and deletes with a common row setup code
*/
private static interface RowOp<R> {
- void doWrite( final MultiTennantColumnFamily<OrganizationScope, R, DirectedEdge> columnFamily, R rowKey,
- DirectedEdge edge );
+ /**
+ * Write the edge with the given data
+ */
+ <R extends RowKey> void doWrite(
+ final MultiTennantColumnFamily<OrganizationScope, R, DirectedEdge> columnFamily, R rowKey,
+ DirectedEdge edge );
+
+ /**
+ * Write the edge into the version cf
+ */
+ void doWrite( final MultiTennantColumnFamily<OrganizationScope, EdgeRowKey, UUID> columnFamily,
+ EdgeRowKey rowKey, UUID version );
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/136edaba/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/EdgeManagerIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/EdgeManagerIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/EdgeManagerIT.java
index 17a5f5a..379f56a 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/EdgeManagerIT.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/EdgeManagerIT.java
@@ -46,6 +46,7 @@ import com.google.inject.Inject;
import rx.Observable;
import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createEdge;
+import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createGetByEdge;
import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createId;
import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createSearchByEdge;
import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createSearchByEdgeAndId;
@@ -72,7 +73,7 @@ public class EdgeManagerIT {
@Inject
- protected EdgeManagerFactory emf;
+ protected GraphManagerFactory emf;
protected OrganizationScope scope;
@@ -93,7 +94,7 @@ public class EdgeManagerIT {
@Test
public void testWriteReadEdgeTypeSource() {
- EdgeManager em = emf.createEdgeManager( scope );
+ GraphManager em = emf.createEdgeManager( scope );
Edge edge = createEdge( "source", "test", "target" );
@@ -126,7 +127,7 @@ public class EdgeManagerIT {
@Test
public void testWriteReadEdgeTypeTarget() {
- EdgeManager em = emf.createEdgeManager( scope );
+ GraphManager em = emf.createEdgeManager( scope );
Edge edge = createEdge( "source", "test", "target" );
@@ -159,7 +160,7 @@ public class EdgeManagerIT {
@Test
public void testWriteReadEdgeTypeVersionSource() {
- EdgeManager em = emf.createEdgeManager( scope );
+ GraphManager em = emf.createEdgeManager( scope );
final UUID earlyVersion = UUIDGenerator.newTimeUUID();
@@ -194,7 +195,7 @@ public class EdgeManagerIT {
@Test
public void testWriteReadEdgeTypeVersionTarget() {
- EdgeManager em = emf.createEdgeManager( scope );
+ GraphManager em = emf.createEdgeManager( scope );
final UUID earlyVersion = UUIDGenerator.newTimeUUID();
@@ -233,7 +234,7 @@ public class EdgeManagerIT {
@Test
public void testWriteReadEdgeTypeVersionSourceDistinct() {
- EdgeManager em = emf.createEdgeManager( scope );
+ GraphManager em = emf.createEdgeManager( scope );
final UUID earlyVersion = UUIDGenerator.newTimeUUID();
@@ -266,6 +267,8 @@ public class EdgeManagerIT {
Iterator<Edge> returned = edges.toBlockingObservable().getIterator();
assertEquals( "Correct edge returned", edge3, returned.next() );
+ assertEquals( "Correct edge returned", edge2, returned.next() );
+ assertEquals( "Correct edge returned", edge1, returned.next() );
assertFalse( "No more edges", returned.hasNext() );
//now test with an earlier version, we shouldn't get the edge back
@@ -276,6 +279,7 @@ public class EdgeManagerIT {
returned = edges.toBlockingObservable().getIterator();
assertEquals( "Correct edge returned", edge2, returned.next() );
+ assertEquals( "Correct edge returned", edge1, returned.next() );
assertFalse( "No more edges", returned.hasNext() );
search = createSearchByEdge( edge1.getSourceNode(), edge1.getType(), edge1.getVersion(), null );
@@ -302,7 +306,7 @@ public class EdgeManagerIT {
public void testWriteReadEdgeTypeVersionTargetDistinct() {
- EdgeManager em = emf.createEdgeManager( scope );
+ GraphManager em = emf.createEdgeManager( scope );
final UUID earlyVersion = UUIDGenerator.newTimeUUID();
@@ -335,6 +339,8 @@ public class EdgeManagerIT {
Iterator<Edge> returned = edges.toBlockingObservable().getIterator();
assertEquals( "Correct edge returned", edge3, returned.next() );
+ assertEquals( "Correct edge returned", edge2, returned.next() );
+ assertEquals( "Correct edge returned", edge1, returned.next() );
assertFalse( "No more edges", returned.hasNext() );
//now test with an earlier version, we shouldn't get the edge back
@@ -345,6 +351,7 @@ public class EdgeManagerIT {
returned = edges.toBlockingObservable().getIterator();
assertEquals( "Correct edge returned", edge2, returned.next() );
+ assertEquals( "Correct edge returned", edge1, returned.next() );
assertFalse( "No more edges", returned.hasNext() );
search = createSearchByEdge( edge1.getTargetNode(), edge1.getType(), edge1.getVersion(), null );
@@ -370,7 +377,7 @@ public class EdgeManagerIT {
@Test
public void testWriteReadEdgeTypePagingSource() {
- EdgeManager em = emf.createEdgeManager( scope );
+ GraphManager em = emf.createEdgeManager( scope );
final Id sourceId = createId( "source" );
@@ -391,7 +398,7 @@ public class EdgeManagerIT {
//now test retrieving it
SearchByEdgeType search =
- createSearchByEdge( edge1.getSourceNode(), edge1.getType(), edge1.getVersion(), null );
+ createSearchByEdge( edge1.getSourceNode(), edge1.getType(), edge3.getVersion(), null );
Observable<Edge> edges = em.loadEdgesFromSource( search );
@@ -400,10 +407,15 @@ public class EdgeManagerIT {
//we have 3 edges, but we specified our first edge as the max, we shouldn't get any more results than the first
+ assertEquals( "Correct edge returned", edge3, returned.next() );
+
+ assertEquals( "Correct edge returned", edge2, returned.next() );
+
assertEquals( "Correct edge returned", edge1, returned.next() );
assertFalse( "No more edges", returned.hasNext() );
+ //still edge 3 is our max version, but we start with edge 2 as our last read
search = createSearchByEdge( edge1.getSourceNode(), edge1.getType(), edge3.getVersion(), edge2 );
edges = em.loadEdgesFromSource( search );
@@ -411,7 +423,7 @@ public class EdgeManagerIT {
//implicitly blows up if more than 1 is returned from "single"
returned = edges.toBlockingObservable().getIterator();
- assertEquals( "Paged correctly", edge3, returned.next() );
+ assertEquals( "Paged correctly", edge1, returned.next() );
assertFalse( "End of stream", returned.hasNext() );
}
@@ -421,7 +433,7 @@ public class EdgeManagerIT {
public void testWriteReadEdgeTypePagingTarget() {
- EdgeManager em = emf.createEdgeManager( scope );
+ GraphManager em = emf.createEdgeManager( scope );
final Id targetId = createId( "target" );
@@ -442,7 +454,7 @@ public class EdgeManagerIT {
//now test retrieving it
SearchByEdgeType search =
- createSearchByEdge( edge1.getTargetNode(), edge1.getType(), edge1.getVersion(), null );
+ createSearchByEdge( edge1.getTargetNode(), edge1.getType(), edge3.getVersion(), null );
Observable<Edge> edges = em.loadEdgesToTarget( search );
@@ -451,6 +463,10 @@ public class EdgeManagerIT {
//we have 3 edges, but we specified our first edge as the max, we shouldn't get any more results than the first
+ assertEquals( "Correct edge returned", edge3, returned.next() );
+
+ assertEquals( "Correct edge returned", edge2, returned.next() );
+
assertEquals( "Correct edge returned", edge1, returned.next() );
@@ -463,7 +479,7 @@ public class EdgeManagerIT {
//implicitly blows up if more than 1 is returned from "single"
returned = edges.toBlockingObservable().getIterator();
- assertEquals( "Paged correctly", edge3, returned.next() );
+ assertEquals( "Paged correctly", edge1, returned.next() );
assertFalse( "End of stream", returned.hasNext() );
}
@@ -472,7 +488,7 @@ public class EdgeManagerIT {
@Test
public void testWriteReadEdgeTypeTargetTypeSource() {
- EdgeManager em = emf.createEdgeManager( scope );
+ GraphManager em = emf.createEdgeManager( scope );
Edge edge = createEdge( "source", "test", "target" );
@@ -508,7 +524,7 @@ public class EdgeManagerIT {
@Test
public void testWriteReadEdgeTypeTargetTypeTarget() {
- EdgeManager em = emf.createEdgeManager( scope );
+ GraphManager em = emf.createEdgeManager( scope );
Edge edge = createEdge( "source", "test", "target" );
@@ -544,7 +560,7 @@ public class EdgeManagerIT {
@Test
public void testWriteReadEdgeDeleteSource() {
- EdgeManager em = emf.createEdgeManager( scope );
+ GraphManager em = emf.createEdgeManager( scope );
Edge edge = createEdge( "source", "test", "target" );
@@ -573,9 +589,20 @@ public class EdgeManagerIT {
assertEquals( "Correct edge returned", edge, returned );
+ final SearchByEdge searchByEdge = createGetByEdge(edge.getSourceNode(), edge.getType(), edge.getTargetNode(), edge.getVersion(), null);
+
+ returned = em.loadEdgeVersions(searchByEdge).toBlockingObservable().single();
+
+
+ assertEquals( "Correct edge returned", edge, returned );
+
//now delete it
- em.deleteEdge( edge ).toBlockingObservable().last();
+ returned = em.deleteEdge( edge ).toBlockingObservable().last();
+
+
+ assertEquals( "Correct edge returned", edge, returned );
+
//now test retrieval, should be null
edges = em.loadEdgesFromSource( search );
@@ -586,6 +613,7 @@ public class EdgeManagerIT {
assertNull( "No edge returned", returned );
+
//no search by type, should be null as well
edges = em.loadEdgesFromSourceByType( searchById );
@@ -594,13 +622,17 @@ public class EdgeManagerIT {
returned = edges.toBlockingObservable().singleOrDefault( null );
assertNull( "No edge returned", returned );
+
+ returned = em.loadEdgeVersions(searchByEdge).toBlockingObservable().singleOrDefault(null);
+
+ assertNull( "No edge returned", returned );
}
@Test
public void testWriteReadEdgeDeleteTarget() {
- EdgeManager em = emf.createEdgeManager( scope );
+ GraphManager em = emf.createEdgeManager( scope );
Edge edge = createEdge( "source", "test", "target" );
@@ -656,7 +688,7 @@ public class EdgeManagerIT {
@Test
public void testWriteReadEdgeTypesSourceTypes() {
- final EdgeManager em = emf.createEdgeManager( scope );
+ final GraphManager em = emf.createEdgeManager( scope );
Id sourceId = new SimpleId( "source" );
Id targetId1 = new SimpleId( "target" );
@@ -718,7 +750,7 @@ public class EdgeManagerIT {
@Test
public void testWriteReadEdgeTypesTargetTypes() {
- final EdgeManager em = emf.createEdgeManager( scope );
+ final GraphManager em = emf.createEdgeManager( scope );
Id sourceId1 = new SimpleId( "source" );
Id sourceId2 = new SimpleId( "source2" );
@@ -783,7 +815,7 @@ public class EdgeManagerIT {
@Test
public void testWriteReadEdgeTypesSourceTypesPaging() {
- final EdgeManager em = emf.createEdgeManager( scope );
+ final GraphManager em = emf.createEdgeManager( scope );
Id sourceId1 = new SimpleId( "source" );
Id targetId1 = new SimpleId( "target" );
@@ -861,7 +893,7 @@ public class EdgeManagerIT {
@Test
public void testWriteReadEdgeTypesTargetTypesPaging() {
- final EdgeManager em = emf.createEdgeManager( scope );
+ final GraphManager em = emf.createEdgeManager( scope );
Id sourceId1 = new SimpleId( "source" );
Id sourceId2 = new SimpleId( "source2" );
@@ -942,7 +974,7 @@ public class EdgeManagerIT {
@Test
public void testMarkSourceEdges() {
- final EdgeManager em = emf.createEdgeManager( scope );
+ final GraphManager em = emf.createEdgeManager( scope );
Id sourceId = new SimpleId( "source" );
Id targetId1 = new SimpleId( "target" );
@@ -968,10 +1000,10 @@ public class EdgeManagerIT {
Iterator<Edge> results = edges.toBlockingObservable().getIterator();
- assertEquals( "Edges correct", edge1, results.next() );
-
assertEquals( "Edges correct", edge2, results.next() );
+ assertEquals( "Edges correct", edge1, results.next() );
+
assertFalse( "No more edges", results.hasNext() );
//now delete one of the edges
@@ -1011,7 +1043,7 @@ public class EdgeManagerIT {
@Test
public void testMarkTargetEdges() {
- final EdgeManager em = emf.createEdgeManager( scope );
+ final GraphManager em = emf.createEdgeManager( scope );
Id sourceId1 = new SimpleId( "source" );
Id sourceId2 = new SimpleId( "source2" );
@@ -1037,9 +1069,10 @@ public class EdgeManagerIT {
Iterator<Edge> results = edges.toBlockingObservable().getIterator();
+ assertEquals( "Edges correct", edge2, results.next() );
+
assertEquals( "Edges correct", edge1, results.next() );
- assertEquals( "Edges correct", edge2, results.next() );
assertFalse( "No more edges", results.hasNext() );
@@ -1078,7 +1111,7 @@ public class EdgeManagerIT {
@Test
public void testMarkSourceEdgesType() {
- final EdgeManager em = emf.createEdgeManager( scope );
+ final GraphManager em = emf.createEdgeManager( scope );
Id sourceId = new SimpleId( "source" );
Id targetId1 = new SimpleId( "target" );
@@ -1156,7 +1189,7 @@ public class EdgeManagerIT {
@Test
public void testMarkTargetEdgesType() {
- final EdgeManager em = emf.createEdgeManager( scope );
+ final GraphManager em = emf.createEdgeManager( scope );
Id sourceId1 = new SimpleId( "source" );
Id sourceId2 = new SimpleId( "source2" );
@@ -1234,7 +1267,7 @@ public class EdgeManagerIT {
@Test
public void markSourceNode() {
- final EdgeManager em = emf.createEdgeManager( scope );
+ final GraphManager em = emf.createEdgeManager( scope );
Id sourceId = new SimpleId( "source" );
Id targetId1 = new SimpleId( "target" );
@@ -1256,10 +1289,10 @@ public class EdgeManagerIT {
.toBlockingObservable().getIterator();
- assertEquals( "Edge found", edge1, results.next() );
-
assertEquals( "Edge found", edge2, results.next() );
+ assertEquals( "Edge found", edge1, results.next() );
+
assertFalse( "No more edges", results.hasNext() );
@@ -1313,12 +1346,10 @@ public class EdgeManagerIT {
}
-
-
@Test
public void markTargetNode() {
- final EdgeManager em = emf.createEdgeManager( scope );
+ final GraphManager em = emf.createEdgeManager( scope );
Id sourceId1 = new SimpleId( "source" );
Id sourceId2 = new SimpleId( "source2" );
@@ -1340,10 +1371,10 @@ public class EdgeManagerIT {
.toBlockingObservable().getIterator();
- assertEquals( "Edge found", edge1, results.next() );
-
assertEquals( "Edge found", edge2, results.next() );
+ assertEquals( "Edge found", edge1, results.next() );
+
assertFalse( "No more edges", results.hasNext() );
@@ -1397,10 +1428,9 @@ public class EdgeManagerIT {
}
-
@Test(expected = NullPointerException.class)
public void invalidEdgeTypesWrite( @All Edge edge ) {
- final EdgeManager em = emf.createEdgeManager( scope );
+ final GraphManager em = emf.createEdgeManager( scope );
em.writeEdge( edge );
}
@@ -1408,7 +1438,7 @@ public class EdgeManagerIT {
@Test(expected = NullPointerException.class)
public void invalidEdgeTypesDelete( @All Edge edge ) {
- final EdgeManager em = emf.createEdgeManager( scope );
+ final GraphManager em = emf.createEdgeManager( scope );
em.deleteEdge( edge );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/136edaba/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/EdgeManagerStressTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/EdgeManagerStressTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/EdgeManagerStressTest.java
index 773e3ee..9d8db6e 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/EdgeManagerStressTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/EdgeManagerStressTest.java
@@ -28,6 +28,7 @@ import org.jukito.JukitoRunner;
import org.jukito.UseModules;
import org.junit.Before;
import org.junit.ClassRule;
+import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -58,13 +59,13 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-@RunWith( JukitoRunner.class )
-@UseModules( TestGraphModule.class )
+@RunWith(JukitoRunner.class)
+@UseModules(TestGraphModule.class)
public class EdgeManagerStressTest {
private static final Logger log = LoggerFactory.getLogger( EdgeManagerStressTest.class );
@Inject
- private EdgeManagerFactory factory;
+ private GraphManagerFactory factory;
@ClassRule
public static CassandraRule rule = new CassandraRule();
@@ -91,6 +92,7 @@ public class EdgeManagerStressTest {
@Test
+ @Ignore
public void writeThousands() throws InterruptedException {
EdgeGenerator generator = new EdgeGenerator() {
@@ -108,7 +110,7 @@ public class EdgeManagerStressTest {
@Override
- public Observable<Edge> doSearch( final EdgeManager manager ) {
+ public Observable<Edge> doSearch( final GraphManager manager ) {
final UUID uuid = UUIDGenerator.newTimeUUID();
@@ -176,6 +178,8 @@ public class EdgeManagerStressTest {
doTest( generator );
}
+
+ @Ignore
@Test
public void writeThousandsSingleSource() throws InterruptedException {
EdgeGenerator generator = new EdgeGenerator() {
@@ -193,7 +197,7 @@ public class EdgeManagerStressTest {
@Override
- public Observable<Edge> doSearch( final EdgeManager manager ) {
+ public Observable<Edge> doSearch( final GraphManager manager ) {
UUID uuid = UUIDGenerator.newTimeUUID();
return manager.loadEdgesFromSource( new SimpleSearchByEdgeType( sourceId, "test", uuid, null ) );
@@ -203,41 +207,41 @@ public class EdgeManagerStressTest {
doTest( generator );
}
+
@Test
- public void writeThousandsSingleTarget() throws InterruptedException {
- EdgeGenerator generator = new EdgeGenerator() {
+ @Ignore
+ public void writeThousandsSingleTarget() throws InterruptedException {
+ EdgeGenerator generator = new EdgeGenerator() {
- private Id targetId = createId( "target" );
+ private Id targetId = createId( "target" );
- @Override
- public Edge newEdge() {
- Edge edge = createEdge( createId( "source" ), "test", targetId );
+ @Override
+ public Edge newEdge() {
+ Edge edge = createEdge( createId( "source" ), "test", targetId );
- return edge;
- }
+ return edge;
+ }
- @Override
- public Observable<Edge> doSearch( final EdgeManager manager ) {
- UUID uuid = UUIDGenerator.newTimeUUID();
+ @Override
+ public Observable<Edge> doSearch( final GraphManager manager ) {
+ UUID uuid = UUIDGenerator.newTimeUUID();
- return manager.loadEdgesToTarget( new SimpleSearchByEdgeType( targetId, "test", uuid, null ) );
- }
- };
+ return manager.loadEdgesToTarget( new SimpleSearchByEdgeType( targetId, "test", uuid, null ) );
+ }
+ };
- doTest( generator );
- }
+ doTest( generator );
+ }
/**
* Execute the test with the generator
- * @param generator
- * @throws InterruptedException
*/
private void doTest( EdgeGenerator generator ) throws InterruptedException {
- EdgeManager manager = factory.createEdgeManager( scope );
+ GraphManager manager = factory.createEdgeManager( scope );
int limit = 10000;
@@ -308,6 +312,6 @@ public class EdgeManagerStressTest {
*/
public Edge newEdge();
- public Observable<Edge> doSearch( final EdgeManager manager );
+ public Observable<Edge> doSearch( final GraphManager manager );
}
}