You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/03/27 22:21:30 UTC
[16/50] [abbrv] git commit: Merge with hystrix. Added debug logging
Merge with hystrix. Added debug logging
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/029c0650
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/029c0650
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/029c0650
Branch: refs/pull/77/head
Commit: 029c06502c7359117a49638a7a198375dc0bb868
Parents: 136edab
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Mar 25 17:32:31 2014 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Mar 25 17:32:31 2014 -0700
----------------------------------------------------------------------
.../graph/impl/NodeDeleteListener.java | 35 +++---
.../impl/EdgeSerializationImpl.java | 16 +--
.../impl/parse/ObservableIterator.java | 12 +-
.../graph/GraphManagerTimeoutIT.java | 48 ++++----
.../graph/impl/NodeDeleteListenerTest.java | 41 ++++---
.../serialization/EdgeSerializationTest.java | 93 ++++++++++-----
.../graph/serialization/TestCount.java | 117 +++++++++++++++++++
.../graph/test/util/EdgeTestUtils.java | 12 +-
stack/corepersistence/pom.xml | 2 +-
9 files changed, 271 insertions(+), 105 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/029c0650/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
index 4ba0142..d0c0bec 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
@@ -53,30 +53,20 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, Intege
private final EdgeMetadataSerialization edgeMetadataSerialization;
private final EdgeDeleteRepair edgeDeleteRepair;
private final EdgeMetaRepair edgeMetaRepair;
-<<<<<<< Updated upstream
-=======
private final GraphFig graphFig;
protected final Keyspace keyspace;
->>>>>>> Stashed changes
/**
* Wire the serialization dependencies
*/
@Inject
-<<<<<<< Updated upstream
- public NodeDeleteListener( final NodeSerialization nodeSerialization, final EdgeSerialization edgeSerialization, @NodeDelete final AsyncProcessor nodeDelete,
- final EdgeMetadataSerialization edgeMetadataSerialization,
- final EdgeDeleteRepair edgeDeleteRepair, final EdgeMetaRepair edgeMetaRepair
- ) {
-=======
public NodeDeleteListener( final NodeSerialization nodeSerialization, final EdgeSerialization edgeSerialization,
final EdgeMetadataSerialization edgeMetadataSerialization,
final EdgeDeleteRepair edgeDeleteRepair, final EdgeMetaRepair edgeMetaRepair,
final GraphFig graphFig, @NodeDelete final AsyncProcessor nodeDelete,
final Keyspace keyspace ) {
->>>>>>> Stashed changes
this.nodeSerialization = nodeSerialization;
@@ -163,11 +153,13 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, Intege
@Override
public Observable<MarkedEdge> call( final List<MarkedEdge> markedEdges ) {
+ LOG.debug( "Batching {} edges for deletion" , markedEdges.size());
+
final MutationBatch batch = keyspace.prepareMutationBatch();
for(MarkedEdge edge: markedEdges){
- //delete the newest edge <= the version on the node delete
+ //delete the newest edge <= the version on the node delete
LOG.debug( "Deleting edge {}", edge );
final MutationBatch delete = edgeSerialization.deleteEdge( scope, edge );
@@ -184,12 +176,17 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, Intege
return Observable.from(markedEdges);
}
} )
- .flatMap( new Func1<MarkedEdge, Observable<MarkedEdge>>() {
- @Override
- public Observable<MarkedEdge> call( final MarkedEdge edge ) {
+ //TODO Fix this
+// .flatMap( new Func1<MarkedEdge, Observable<MarkedEdge>>() {
+// @Override
+// public Observable<MarkedEdge> call( final MarkedEdge edge ) {
+//
+//
+// return Observable.just( edge );
+
+
- return Observable.just( edge );
// //delete both the source and target meta data in parallel for the edge we deleted in the previous step
// //if nothing else is using them
// Observable<Integer> sourceMetaRepaired =
@@ -209,8 +206,12 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, Intege
// return edge;
// }
// } );
- }
- } ).count()
+
+
+// }
+// })
+
+ .count()
//if nothing is ever emitted, emit 0 so that we know no operations took place. Finally remove the
// target node in the mark
.defaultIfEmpty( 0 ).doOnCompleted( new Action0() {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/029c0650/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
index d255929..0341cac 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
@@ -156,8 +156,8 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
doWrite( scope, edge, new RowOp<RowKey>() {
@Override
- public <R extends RowKey> void doWrite(
- final MultiTennantColumnFamily<OrganizationScope, R, DirectedEdge> columnFamily, final R rowKey,
+ public void doWrite(
+ final MultiTennantColumnFamily<OrganizationScope, RowKey, DirectedEdge> columnFamily, final RowKey rowKey,
final DirectedEdge edge ) {
batch.withRow( columnFamily, ScopedRowKey.fromKey( scope, rowKey ) ).putColumn( edge, false );
}
@@ -182,8 +182,8 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
doWrite( scope, edge, new RowOp<RowKey>() {
@Override
- public <R extends RowKey> void doWrite(
- final MultiTennantColumnFamily<OrganizationScope, R, DirectedEdge> columnFamily, final R rowKey,
+ public void doWrite(
+ final MultiTennantColumnFamily<OrganizationScope, RowKey, DirectedEdge> columnFamily, final RowKey rowKey,
final DirectedEdge edge ) {
batch.withRow( columnFamily, ScopedRowKey.fromKey( scope, rowKey ) ).putColumn( edge, true );
}
@@ -208,8 +208,8 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
doWrite( scope, edge, new RowOp<RowKey>() {
@Override
- public <R extends RowKey> void doWrite(
- final MultiTennantColumnFamily<OrganizationScope, R, DirectedEdge> columnFamily, final R rowKey,
+ public void doWrite(
+ final MultiTennantColumnFamily<OrganizationScope, RowKey, DirectedEdge> columnFamily, final RowKey rowKey,
final DirectedEdge edge ) {
batch.withRow( columnFamily, ScopedRowKey.fromKey( scope, rowKey ) ).deleteColumn( edge );
}
@@ -493,7 +493,7 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
.autoPaginate( true ).withColumnRange( rangeBuilder.build() );
- return new ColumnNameIterator<DirectedEdge, MarkedEdge>( query, searcher, searcher.hasPage() );
+ return new ColumnNameIterator<DirectedEdge, MarkedEdge>( query, searcher, searcher.hasPage(), graphFig.getReadTimeout() );
}
@@ -837,7 +837,7 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
/**
* Write the edge with the given data
*/
- <R extends RowKey> void doWrite(
+ void doWrite(
final MultiTennantColumnFamily<OrganizationScope, R, DirectedEdge> columnFamily, R rowKey,
DirectedEdge edge );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/029c0650/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ObservableIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ObservableIterator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ObservableIterator.java
index 2274868..2d2456b 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ObservableIterator.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ObservableIterator.java
@@ -3,6 +3,10 @@ package org.apache.usergrid.persistence.graph.serialization.impl.parse;
import java.util.Iterator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import rx.Observable;
import rx.Observer;
import rx.Subscriber;
@@ -17,6 +21,8 @@ import rx.subscriptions.Subscriptions;
*/
public abstract class ObservableIterator<T> implements Observable.OnSubscribe<T> {
+ private static final Logger log = LoggerFactory.getLogger( ObservableIterator.class );
+
@Override
public void call( final Subscriber<? super T> subscriber ) {
@@ -29,7 +35,11 @@ public abstract class ObservableIterator<T> implements Observable.OnSubscribe<T>
//while we have items to emit and our subscriber is subscribed, we want to keep emitting items
while ( itr.hasNext() && !subscriber.isUnsubscribed()) {
- subscriber.onNext( itr.next() );
+ final T next = itr.next();
+
+ log.debug( "Emitting {}", next );
+
+ subscriber.onNext( next );
}
subscriber.onCompleted();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/029c0650/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerTimeoutIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerTimeoutIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerTimeoutIT.java
index 509fe6a..ed8c1b8 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerTimeoutIT.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerTimeoutIT.java
@@ -119,7 +119,7 @@ public class GraphManagerTimeoutIT {
public void testWriteReadEdgeTypeSource( EdgeSerialization serialization ) throws InterruptedException {
- final GraphManager em = emf.createGraphManager( scope );
+ final GraphManager em = emf.createEdgeManager( scope );
final MarkedEdge edge = createEdge( "source", "edge", "target" );
@@ -223,7 +223,7 @@ public class GraphManagerTimeoutIT {
@Test
public void testWriteReadEdgeTypeTarget() {
- GraphManager em = emf.createGraphManager( scope );
+ GraphManager em = emf.createEdgeManager( scope );
Edge edge = createEdge( "source", "test", "target" );
@@ -256,7 +256,7 @@ public class GraphManagerTimeoutIT {
@Test
public void testWriteReadEdgeTypeVersionSource() {
- GraphManager em = emf.createGraphManager( scope );
+ GraphManager em = emf.createEdgeManager( scope );
final UUID earlyVersion = UUIDGenerator.newTimeUUID();
@@ -291,7 +291,7 @@ public class GraphManagerTimeoutIT {
@Test
public void testWriteReadEdgeTypeVersionTarget() {
- GraphManager em = emf.createGraphManager( scope );
+ GraphManager em = emf.createEdgeManager( scope );
final UUID earlyVersion = UUIDGenerator.newTimeUUID();
@@ -330,7 +330,7 @@ public class GraphManagerTimeoutIT {
@Test
public void testWriteReadEdgeTypeVersionSourceDistinct() {
- GraphManager em = emf.createGraphManager( scope );
+ GraphManager em = emf.createEdgeManager( scope );
final UUID earlyVersion = UUIDGenerator.newTimeUUID();
@@ -399,7 +399,7 @@ public class GraphManagerTimeoutIT {
public void testWriteReadEdgeTypeVersionTargetDistinct() {
- GraphManager em = emf.createGraphManager( scope );
+ GraphManager em = emf.createEdgeManager( scope );
final UUID earlyVersion = UUIDGenerator.newTimeUUID();
@@ -467,7 +467,7 @@ public class GraphManagerTimeoutIT {
@Test
public void testWriteReadEdgeTypePagingSource() {
- GraphManager em = emf.createGraphManager( scope );
+ GraphManager em = emf.createEdgeManager( scope );
final Id sourceId = createId( "source" );
@@ -518,7 +518,7 @@ public class GraphManagerTimeoutIT {
public void testWriteReadEdgeTypePagingTarget() {
- GraphManager em = emf.createGraphManager( scope );
+ GraphManager em = emf.createEdgeManager( scope );
final Id targetId = createId( "target" );
@@ -569,7 +569,7 @@ public class GraphManagerTimeoutIT {
@Test
public void testWriteReadEdgeTypeTargetTypeSource() {
- GraphManager em = emf.createGraphManager( scope );
+ GraphManager em = emf.createEdgeManager( scope );
Edge edge = createEdge( "source", "test", "target" );
@@ -605,7 +605,7 @@ public class GraphManagerTimeoutIT {
@Test
public void testWriteReadEdgeTypeTargetTypeTarget() {
- GraphManager em = emf.createGraphManager( scope );
+ GraphManager em = emf.createEdgeManager( scope );
Edge edge = createEdge( "source", "test", "target" );
@@ -641,7 +641,7 @@ public class GraphManagerTimeoutIT {
@Test
public void testWriteReadEdgeDeleteSource() {
- GraphManager em = emf.createGraphManager( scope );
+ GraphManager em = emf.createEdgeManager( scope );
Edge edge = createEdge( "source", "test", "target" );
@@ -697,7 +697,7 @@ public class GraphManagerTimeoutIT {
@Test
public void testWriteReadEdgeDeleteTarget() {
- GraphManager em = emf.createGraphManager( scope );
+ GraphManager em = emf.createEdgeManager( scope );
Edge edge = createEdge( "source", "test", "target" );
@@ -753,7 +753,7 @@ public class GraphManagerTimeoutIT {
@Test
public void testWriteReadEdgeTypesSourceTypes() {
- final GraphManager em = emf.createGraphManager( scope );
+ final GraphManager em = emf.createEdgeManager( scope );
Id sourceId = new SimpleId( "source" );
Id targetId1 = new SimpleId( "target" );
@@ -815,7 +815,7 @@ public class GraphManagerTimeoutIT {
@Test
public void testWriteReadEdgeTypesTargetTypes() {
- final GraphManager em = emf.createGraphManager( scope );
+ final GraphManager em = emf.createEdgeManager( scope );
Id sourceId1 = new SimpleId( "source" );
Id sourceId2 = new SimpleId( "source2" );
@@ -880,7 +880,7 @@ public class GraphManagerTimeoutIT {
@Test
public void testWriteReadEdgeTypesSourceTypesPaging() {
- final GraphManager em = emf.createGraphManager( scope );
+ final GraphManager em = emf.createEdgeManager( scope );
Id sourceId1 = new SimpleId( "source" );
Id targetId1 = new SimpleId( "target" );
@@ -958,7 +958,7 @@ public class GraphManagerTimeoutIT {
@Test
public void testWriteReadEdgeTypesTargetTypesPaging() {
- final GraphManager em = emf.createGraphManager( scope );
+ final GraphManager em = emf.createEdgeManager( scope );
Id sourceId1 = new SimpleId( "source" );
Id sourceId2 = new SimpleId( "source2" );
@@ -1039,7 +1039,7 @@ public class GraphManagerTimeoutIT {
@Test
public void testMarkSourceEdges() {
- final GraphManager em = emf.createGraphManager( scope );
+ final GraphManager em = emf.createEdgeManager( scope );
Id sourceId = new SimpleId( "source" );
Id targetId1 = new SimpleId( "target" );
@@ -1108,7 +1108,7 @@ public class GraphManagerTimeoutIT {
@Test
public void testMarkTargetEdges() {
- final GraphManager em = emf.createGraphManager( scope );
+ final GraphManager em = emf.createEdgeManager( scope );
Id sourceId1 = new SimpleId( "source" );
Id sourceId2 = new SimpleId( "source2" );
@@ -1175,7 +1175,7 @@ public class GraphManagerTimeoutIT {
@Test
public void testMarkSourceEdgesType() {
- final GraphManager em = emf.createGraphManager( scope );
+ final GraphManager em = emf.createEdgeManager( scope );
Id sourceId = new SimpleId( "source" );
Id targetId1 = new SimpleId( "target" );
@@ -1253,7 +1253,7 @@ public class GraphManagerTimeoutIT {
@Test
public void testMarkTargetEdgesType() {
- final GraphManager em = emf.createGraphManager( scope );
+ final GraphManager em = emf.createEdgeManager( scope );
Id sourceId1 = new SimpleId( "source" );
Id sourceId2 = new SimpleId( "source2" );
@@ -1331,7 +1331,7 @@ public class GraphManagerTimeoutIT {
@Test
public void markSourceNode() {
- final GraphManager em = emf.createGraphManager( scope );
+ final GraphManager em = emf.createEdgeManager( scope );
Id sourceId = new SimpleId( "source" );
Id targetId1 = new SimpleId( "target" );
@@ -1413,7 +1413,7 @@ public class GraphManagerTimeoutIT {
@Test
public void markTargetNode() {
- final GraphManager em = emf.createGraphManager( scope );
+ final GraphManager em = emf.createEdgeManager( scope );
Id sourceId1 = new SimpleId( "source" );
Id sourceId2 = new SimpleId( "source2" );
@@ -1494,7 +1494,7 @@ public class GraphManagerTimeoutIT {
@Test( expected = NullPointerException.class )
public void invalidEdgeTypesWrite( @All Edge edge ) {
- final GraphManager em = emf.createGraphManager( scope );
+ final GraphManager em = emf.createEdgeManager( scope );
em.writeEdge( edge );
}
@@ -1502,7 +1502,7 @@ public class GraphManagerTimeoutIT {
@Test( expected = NullPointerException.class )
public void invalidEdgeTypesDelete( @All Edge edge ) {
- final GraphManager em = emf.createGraphManager( scope );
+ final GraphManager em = emf.createEdgeManager( scope );
em.deleteEdge( edge );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/029c0650/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
index e735aea..48890ca 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
@@ -11,6 +11,8 @@ import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.usergrid.persistence.collection.OrganizationScope;
import org.apache.usergrid.persistence.collection.cassandra.CassandraRule;
@@ -50,6 +52,8 @@ import static org.mockito.Mockito.when;
@UseModules( { TestGraphModule.class } )
public class NodeDeleteListenerTest {
+ private static final Logger log = LoggerFactory.getLogger( NodeDeleteListenerTest.class );
+
@ClassRule
public static CassandraRule rule = new CassandraRule();
@@ -309,8 +313,8 @@ public class NodeDeleteListenerTest {
/**
- * Simple test case that tests a single edge and removing the node. The other target node should be removed as
- * well since it has no other targets
+ * Simple test case that tests a single edge and removing the node. The other target node should be removed as well
+ * since it has no other targets
*/
@Test
public void testMultiDelete() throws ConnectionException {
@@ -324,17 +328,21 @@ public class NodeDeleteListenerTest {
final String edgeType = "test";
int countSaved = 0;
+ int sourceCount = 0;
+ int targetCount = 0;
for ( int i = 0; i < edgeCount; i++ ) {
- Edge edge ;
+ Edge edge;
//mix up source vs target, good for testing as well as create a lot of sub types to ensure they're removed
if ( i % 2 == 0 ) {
- edge = createEdge( toDelete, edgeType, createId( "target"+Math.random() ) );
+ edge = createEdge( toDelete, edgeType, createId( "target" + Math.random() ) );
+ sourceCount++;
}
else {
- edge = createEdge( createId( "source"+Math.random() ), edgeType, toDelete );
+ edge = createEdge( createId( "source" + Math.random() ), edgeType, toDelete );
+ targetCount++;
}
//write the edge
@@ -346,13 +354,15 @@ public class NodeDeleteListenerTest {
countSaved++;
}
- assertEquals(edgeCount, countSaved);
+ assertEquals( edgeCount, countSaved );
+
+ log.info( "Saved {} source edges", sourceCount );
+ log.info( "Saved {} target edges", targetCount );
//mark the node so
-// UUID deleteVersion = UUIDGenerator.newTimeUUID();
+ UUID deleteVersion = UUIDGenerator.newTimeUUID();
- UUID deleteVersion = UUID.fromString( "ffffffff-ffff-1fff-bfff-ffffffffffff" );
nodeSerialization.mark( scope, toDelete, deleteVersion ).execute();
@@ -368,21 +378,19 @@ public class NodeDeleteListenerTest {
UUID now = UUIDGenerator.newTimeUUID();
- Iterator<MarkedEdge> returned = edgeSerialization
- .getEdgesFromSource( scope, createSearchByEdge( toDelete, edgeType, now, null ) );
+ Iterator<MarkedEdge> returned =
+ edgeSerialization.getEdgesFromSource( scope, createSearchByEdge( toDelete, edgeType, now, null ) );
//no edge from source node should be returned
assertFalse( "No source should be returned", returned.hasNext() );
//validate it's not returned by the
- returned = edgeSerialization
- .getEdgesToTarget( scope, createSearchByEdge( toDelete, edgeType, now, null ) );
+ returned = edgeSerialization.getEdgesToTarget( scope, createSearchByEdge( toDelete, edgeType, now, null ) );
assertFalse( "No target should be returned", returned.hasNext() );
-
//no types from source
Iterator<String> types =
@@ -400,14 +408,13 @@ public class NodeDeleteListenerTest {
//no target types from source
- Iterator<String> idTypes = edgeMetadataSerialization
- .getIdTypesFromSource( scope, createSearchIdType( toDelete, edgeType, null ) );
+ Iterator<String> idTypes =
+ edgeMetadataSerialization.getIdTypesFromSource( scope, createSearchIdType( toDelete, edgeType, null ) );
assertFalse( idTypes.hasNext() );
//no source types to target
- idTypes = edgeMetadataSerialization
- .getIdTypesToTarget( scope, createSearchIdType( toDelete, edgeType, null ) );
+ idTypes = edgeMetadataSerialization.getIdTypesToTarget( scope, createSearchIdType( toDelete, edgeType, null ) );
assertFalse( idTypes.hasNext() );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/029c0650/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/EdgeSerializationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/EdgeSerializationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/EdgeSerializationTest.java
index 776b9df..9e016b2 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/EdgeSerializationTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/EdgeSerializationTest.java
@@ -13,6 +13,8 @@ import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.usergrid.persistence.collection.OrganizationScope;
import org.apache.usergrid.persistence.collection.cassandra.CassandraRule;
@@ -27,6 +29,8 @@ import org.apache.usergrid.persistence.model.util.UUIDGenerator;
import com.fasterxml.uuid.UUIDComparator;
import com.google.inject.Inject;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createEdge;
@@ -45,10 +49,12 @@ import static org.mockito.Mockito.when;
*
*
*/
-@RunWith(JukitoRunner.class)
-@UseModules({ TestGraphModule.class })
+@RunWith( JukitoRunner.class )
+@UseModules( { TestGraphModule.class } )
public class EdgeSerializationTest {
+ private static final Logger log = LoggerFactory.getLogger( EdgeSerializationTest.class );
+
@ClassRule
public static CassandraRule rule = new CassandraRule();
@@ -64,6 +70,9 @@ public class EdgeSerializationTest {
@Inject
protected GraphFig graphFig;
+ @Inject
+ protected Keyspace keyspace;
+
protected OrganizationScope scope;
@@ -617,51 +626,83 @@ public class EdgeSerializationTest {
}
-
/**
* Tests mixing 2 edge types between 2 nodes. We should get results for the same source->destination with the 2
* edge types
*/
@Test
public void testIteratorPaging() throws ConnectionException {
- final Edge edgev1 = createEdge( "source", "edge1", "target" );
- final Id sourceId = edgev1.getSourceNode();
- final Id targetId = edgev1.getTargetNode();
+ final Id sourceId = createId( "source" );
+ final String edgeType = "edge";
+ final Id targetId = createId( "target" );
- final Edge edgev2 = createEdge( sourceId, "edge1", targetId );
- assertTrue( "Edge version 1 has lower time uuid",
- UUIDComparator.staticCompare( edgev1.getVersion(), edgev2.getVersion() ) < 0 );
+ int writeCount = graphFig.getScanPageSize() * 3;
- //create edge type 2 to ensure we don't get it in results
- final Edge edgeType2V1 = createEdge( sourceId, "edge2", targetId );
- serialization.writeEdge( scope, edgev1 ).execute();
- serialization.writeEdge( scope, edgev2 ).execute();
- serialization.writeEdge( scope, edgeType2V1 ).execute();
+ final MutationBatch batch = keyspace.prepareMutationBatch();
- final UUID now = UUIDGenerator.newTimeUUID();
+ UUID lastMax = null;
+ for ( int i = 0; i < writeCount; i++ ) {
- SearchByEdge search = createGetByEdge( sourceId, "edge1", targetId, now, null );
+ final Edge edge = createEdge( sourceId, edgeType, targetId );
- Iterator<MarkedEdge> results = serialization.getEdgeVersions( scope, search );
+ lastMax = edge.getVersion();
- assertEquals( edgev2, results.next() );
- assertEquals( edgev1, results.next() );
- assertFalse( "No results should be returned", results.hasNext() );
+ batch.mergeShallow( serialization.writeEdge( scope, edge ) );
+ }
- //max version test
+ log.info( "Flushing edges" );
+ batch.execute();
- //test max version
- search = createGetByEdge( sourceId, "edge1", targetId, edgev1.getVersion(), null );
- results = serialization.getEdgeVersions( scope, search );
+ Iterator<MarkedEdge> results =
+ serialization.getEdgeVersions( scope, createGetByEdge( sourceId, edgeType, targetId, lastMax, null ) );
- assertEquals( edgev1, results.next() );
- assertFalse( "Max version was honored", results.hasNext() );
+ verify( results, writeCount );
+
+
+
+ //get them all from source
+ results = serialization.getEdgesFromSource( scope, createSearchByEdge( sourceId, edgeType, lastMax, null ) );
+
+ verify( results, writeCount );
+
+
+
+ results = serialization.getEdgesFromSourceByTargetType( scope,
+ createSearchByEdgeAndId( sourceId, edgeType, lastMax, targetId.getType(), null ) );
+
+ verify( results, writeCount );
+
+
+
+ results = serialization.getEdgesToTarget( scope, createSearchByEdge( targetId, edgeType, lastMax, null ) );
+
+ verify( results, writeCount );
+
+
+
+ results = serialization.getEdgesToTargetBySourceType( scope,
+ createSearchByEdgeAndId( targetId, edgeType, lastMax, sourceId.getType(), null ) );
+
+ verify( results, writeCount );
+ }
+
+
+ private void verify( Iterator<MarkedEdge> results, int expectedCount ) {
+ int count = 0;
+
+ while ( results.hasNext() ) {
+ count++;
+ results.next();
+ }
+
+
+ assertEquals( "All versions returned", expectedCount, count );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/029c0650/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/TestCount.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/TestCount.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/TestCount.java
new file mode 100644
index 0000000..7af55da
--- /dev/null
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/TestCount.java
@@ -0,0 +1,117 @@
+package org.apache.usergrid.persistence.graph.serialization;
+
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.junit.Test;
+
+import rx.Observable;
+import rx.Subscriber;
+import rx.functions.Func1;
+import rx.schedulers.Schedulers;
+
+import static org.junit.Assert.assertEquals;
+
+
+/**
+ *
+ *
+ */
+public class TestCount {
+
+
+ @Test
+ public void mergeTest(){
+
+ final int sizePerObservable = 2000;
+
+ Observable<Integer> input1 = getObservables( sizePerObservable );
+ Observable<Integer> input2 = getObservables( sizePerObservable );
+
+ int returned = Observable.merge(input1, input2).buffer( 1000 ).flatMap(
+ new Func1<List<Integer>, Observable<Integer>>() {
+ @Override
+ public Observable<Integer> call( final List<Integer> integers ) {
+
+ //simulates batching a network operation from buffer, then re-emitting the values passed
+
+ try {
+ Thread.sleep( 100 );
+ }
+ catch ( InterruptedException e ) {
+ throw new RuntimeException( e );
+ }
+
+
+ return Observable.from( integers );
+ }
+ } ).count().defaultIfEmpty( 0 ).toBlockingObservable().last();
+
+
+ assertEquals("Count was correct", sizePerObservable*2, returned);
+ }
+
+
+ /**
+ * Get observables from the sets
+ * @param size
+ * @return
+ */
+ private Observable<Integer> getObservables( int size ){
+
+ final List<Integer> values = new ArrayList<Integer>(size);
+
+ for(int i = 0; i <size; i ++ ) {
+ values.add( i );
+ }
+
+
+ /**
+ * Simulates occasional sleeps while we fetch something over the network
+ */
+ return Observable.create( new Observable.OnSubscribe<Integer>() {
+ @Override
+ public void call( final Subscriber<? super Integer> subscriber ) {
+
+ final int size = values.size();
+
+ for(int i = 0; i < size; i ++){
+
+
+
+ if(i%1000 == 0){
+ //simulate network fetch
+ try {
+ Thread.sleep( 250 );
+ }
+ catch ( InterruptedException e ) {
+ subscriber.onError( e );
+ return;
+ }
+ }
+
+ //Sleep for a very long time before emitting the last value
+ if(i == size -1){
+ try {
+ Thread.sleep(5000);
+ }
+ catch ( InterruptedException e ) {
+ subscriber.onError( e );
+ return;
+ }
+ }
+
+
+ subscriber.onNext( values.get( i ) );
+ }
+
+ subscriber.onCompleted();
+
+ //purposefully no error handling here
+ }
+ } ).subscribeOn( Schedulers.io() );
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/029c0650/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/test/util/EdgeTestUtils.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/test/util/EdgeTestUtils.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/test/util/EdgeTestUtils.java
index 68a67d2..27b9a38 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/test/util/EdgeTestUtils.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/test/util/EdgeTestUtils.java
@@ -70,22 +70,12 @@ public class EdgeTestUtils {
/**
* Create an edge with the specified params
*/
- public static MarkedEdge createEdge( final Id sourceId, final String edgeType, final Id targetId,
- final UUID version ) {
+ public static MarkedEdge createEdge( final Id sourceId, final String edgeType, final Id targetId, final UUID version ) {
return new SimpleMarkedEdge( sourceId, edgeType, targetId, version, false );
}
/**
- * Create an edge with the specified params
- */
- public static MarkedEdge createEdge( final String sourceType, final String edgeType, final String targetType,
- final UUID version ) {
- return new SimpleMarkedEdge( createId( sourceType ), edgeType, createId( targetType ), version, false );
- }
-
-
- /**
* Create the id
*/
public static Id createId( String type ) {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/029c0650/stack/corepersistence/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/pom.xml b/stack/corepersistence/pom.xml
index 2beddd5..9b7b1e1 100644
--- a/stack/corepersistence/pom.xml
+++ b/stack/corepersistence/pom.xml
@@ -75,7 +75,7 @@
<systemPropertyVariables>
<archaius.deployment.environment>UNIT</archaius.deployment.environment>
</systemPropertyVariables>
- <argLine>-Xms2G -Xmx2G</argLine>
+ <argLine>-Xms2G -Xmx4G</argLine>
</configuration>
</plugin>