You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2014/08/05 00:15:02 UTC
[09/11] Checkpoint, still a WIP. Broken stuff.
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ecd603f/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
index 42b3dc2..8c60e2c 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
@@ -20,9 +20,9 @@ package org.apache.usergrid.persistence.graph.serialization.impl.shard;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
-import java.util.List;
import org.junit.Before;
import org.junit.Test;
@@ -33,6 +33,7 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.GraphFig;
import org.apache.usergrid.persistence.graph.MarkedEdge;
import org.apache.usergrid.persistence.graph.SearchByIdType;
+import org.apache.usergrid.persistence.graph.exception.GraphRuntimeException;
import org.apache.usergrid.persistence.graph.impl.SimpleMarkedEdge;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.NodeShardAllocationImpl;
import org.apache.usergrid.persistence.model.entity.Id;
@@ -46,6 +47,7 @@ import static junit.framework.TestCase.assertTrue;
import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createId;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.same;
@@ -77,21 +79,22 @@ public class NodeShardAllocationTest {
when( graphFig.getShardCacheSize() ).thenReturn( 10000l );
when( graphFig.getShardSize() ).thenReturn( 20000l );
- when( graphFig.getShardCacheTimeout()).thenReturn( 30000l );
- }
+ final long timeout = 30000;
+ when( graphFig.getShardCacheTimeout() ).thenReturn( timeout );
+ when( graphFig.getShardMinDelta() ).thenReturn( timeout * 2 );
+ }
@Test
public void minTime() {
final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
- final EdgeColumnFamilies edgeColumnFamilies = mock(EdgeColumnFamilies.class);
+ final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
- final ShardedEdgeSerialization shardedEdgeSerialization = mock(ShardedEdgeSerialization.class);
+ final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
- final NodeShardApproximation nodeShardCounterSerialization =
- mock( NodeShardApproximation.class );
+ final NodeShardApproximation nodeShardCounterSerialization = mock( NodeShardApproximation.class );
final TimeService timeService = mock( TimeService.class );
@@ -100,8 +103,8 @@ public class NodeShardAllocationTest {
NodeShardAllocation approximation =
- new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization, nodeShardCounterSerialization, timeService,
- graphFig, keyspace );
+ new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
+ nodeShardCounterSerialization, timeService, graphFig, keyspace );
final long timeservicetime = System.currentTimeMillis();
@@ -112,7 +115,7 @@ public class NodeShardAllocationTest {
final long returned = approximation.getMinTime();
- assertEquals("Correct time was returned", expected, returned);
+ assertEquals( "Correct time was returned", expected, returned );
}
@@ -120,13 +123,12 @@ public class NodeShardAllocationTest {
public void noShards() {
final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
- final EdgeColumnFamilies edgeColumnFamilies = mock(EdgeColumnFamilies.class);
+ final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
- final ShardedEdgeSerialization shardedEdgeSerialization = mock(ShardedEdgeSerialization.class);
+ final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
- final NodeShardApproximation nodeShardCounterSerialization =
- mock( NodeShardApproximation.class );
+ final NodeShardApproximation nodeShardCounterSerialization = mock( NodeShardApproximation.class );
final TimeService timeService = mock( TimeService.class );
@@ -138,8 +140,8 @@ public class NodeShardAllocationTest {
when( keyspace.prepareMutationBatch() ).thenReturn( batch );
NodeShardAllocation approximation =
- new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization, nodeShardCounterSerialization, timeService,
- graphFig, keyspace );
+ new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
+ nodeShardCounterSerialization, timeService, graphFig, keyspace );
final Id nodeId = createId( "test" );
final String type = "type";
@@ -149,8 +151,8 @@ public class NodeShardAllocationTest {
* Mock up returning an empty iterator, our audit shouldn't create a new shard
*/
when( edgeShardSerialization
- .getEdgeMetaData( same( scope ), same( nodeId ), eq( NodeType.SOURCE ), any( Optional.class ), same( type ),
- same( subType ) ) ).thenReturn( Collections.<Shard>emptyList().iterator() );
+ .getEdgeMetaData( same( scope ), same( nodeId ), eq( NodeType.SOURCE ), any( Optional.class ),
+ same( type ), same( subType ) ) ).thenReturn( Collections.<Shard>emptyList().iterator() );
final boolean result = approximation.auditMaxShard( scope, nodeId, NodeType.SOURCE, type, subType );
@@ -158,19 +160,15 @@ public class NodeShardAllocationTest {
}
-
-
-
@Test
public void existingFutureShardSameTime() {
final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
- final EdgeColumnFamilies edgeColumnFamilies = mock(EdgeColumnFamilies.class);
+ final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
- final ShardedEdgeSerialization shardedEdgeSerialization = mock(ShardedEdgeSerialization.class);
+ final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
- final NodeShardApproximation nodeShardCounterSerialization =
- mock( NodeShardApproximation.class );
+ final NodeShardApproximation nodeShardCounterSerialization = mock( NodeShardApproximation.class );
final TimeService timeService = mock( TimeService.class );
@@ -184,8 +182,8 @@ public class NodeShardAllocationTest {
NodeShardAllocation approximation =
- new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization, nodeShardCounterSerialization, timeService,
- graphFig, keyspace );
+ new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
+ nodeShardCounterSerialization, timeService, graphFig, keyspace );
final Id nodeId = createId( "test" );
final String type = "type";
@@ -196,16 +194,16 @@ public class NodeShardAllocationTest {
when( timeService.getCurrentTime() ).thenReturn( timeservicetime );
- final Shard futureShard = new Shard(10000l, timeservicetime) ;
+ final Shard futureShard = new Shard( 10000l, timeservicetime );
/**
* Mock up returning a min shard, and a future shard
*/
when( edgeShardSerialization
- .getEdgeMetaData( same( scope ), same( nodeId ), eq( NodeType.TARGET), any( Optional.class ), same( type ),
- same( subType ) ) ).thenReturn( Arrays.asList( futureShard ).iterator() );
+ .getEdgeMetaData( same( scope ), same( nodeId ), eq( NodeType.TARGET ), any( Optional.class ),
+ same( type ), same( subType ) ) ).thenReturn( Arrays.asList( futureShard ).iterator() );
- final boolean result = approximation.auditMaxShard( scope, nodeId, NodeType.TARGET, type, subType );
+ final boolean result = approximation.auditMaxShard( scope, nodeId, NodeType.TARGET, type, subType );
assertFalse( "No shard allocated", result );
}
@@ -215,12 +213,11 @@ public class NodeShardAllocationTest {
public void lowCountFutureShard() {
final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
- final EdgeColumnFamilies edgeColumnFamilies = mock(EdgeColumnFamilies.class);
+ final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
- final ShardedEdgeSerialization shardedEdgeSerialization = mock(ShardedEdgeSerialization.class);
+ final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
- final NodeShardApproximation nodeShardApproximation =
- mock( NodeShardApproximation.class );
+ final NodeShardApproximation nodeShardApproximation = mock( NodeShardApproximation.class );
final TimeService timeService = mock( TimeService.class );
@@ -233,8 +230,8 @@ public class NodeShardAllocationTest {
NodeShardAllocation approximation =
- new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization, nodeShardApproximation, timeService,
- graphFig, keyspace );
+ new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
+ nodeShardApproximation, timeService, graphFig, keyspace );
final Id nodeId = createId( "test" );
final String type = "type";
@@ -250,15 +247,16 @@ public class NodeShardAllocationTest {
* Mock up returning a min shard, and a future shard
*/
when( edgeShardSerialization
- .getEdgeMetaData( same( scope ), same( nodeId ), eq(NodeType.TARGET), any( Optional.class ), same( type ),
- same( subType ) ) ).thenReturn( Arrays.asList( new Shard(0l, 0l) ).iterator() );
+ .getEdgeMetaData( same( scope ), same( nodeId ), eq( NodeType.TARGET ), any( Optional.class ),
+ same( type ), same( subType ) ) ).thenReturn( Arrays.asList( new Shard( 0l, 0l ) ).iterator() );
//return a shard size < our max by 1
final long count = graphFig.getShardSize() - 1;
- when( nodeShardApproximation.getCount(scope, nodeId, NodeType.TARGET, 0l, type, subType )).thenReturn( count );
+ when( nodeShardApproximation.getCount( scope, nodeId, NodeType.TARGET, 0l, type, subType ) )
+ .thenReturn( count );
final boolean result = approximation.auditMaxShard( scope, nodeId, NodeType.TARGET, type, subType );
@@ -270,26 +268,25 @@ public class NodeShardAllocationTest {
public void equalCountFutureShard() {
final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
- final EdgeColumnFamilies edgeColumnFamilies = mock(EdgeColumnFamilies.class);
+ final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
- final ShardedEdgeSerialization shardedEdgeSerialization = mock(ShardedEdgeSerialization.class);
+ final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
- final NodeShardApproximation nodeShardApproximation =
- mock( NodeShardApproximation.class );
+ final NodeShardApproximation nodeShardApproximation = mock( NodeShardApproximation.class );
final TimeService timeService = mock( TimeService.class );
final Keyspace keyspace = mock( Keyspace.class );
- final MutationBatch batch = mock(MutationBatch.class);
+ final MutationBatch batch = mock( MutationBatch.class );
- when(keyspace.prepareMutationBatch()).thenReturn( batch );
+ when( keyspace.prepareMutationBatch() ).thenReturn( batch );
NodeShardAllocation approximation =
- new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization, nodeShardApproximation, timeService,
- graphFig, keyspace );
+ new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
+ nodeShardApproximation, timeService, graphFig, keyspace );
final Id nodeId = createId( "test" );
final String type = "type";
@@ -305,39 +302,38 @@ public class NodeShardAllocationTest {
* Mock up returning a min shard
*/
when( edgeShardSerialization
- .getEdgeMetaData( same( scope ), same( nodeId ), eq(NodeType.SOURCE), any( Optional.class ), same( type ),
- same( subType ) ) ).thenReturn( Arrays.asList( new Shard( 0l, 0l ) ).iterator() );
+ .getEdgeMetaData( same( scope ), same( nodeId ), eq( NodeType.SOURCE ), any( Optional.class ),
+ same( type ), same( subType ) ) ).thenReturn( Arrays.asList( new Shard( 0l, 0l ) ).iterator() );
final long shardCount = graphFig.getShardSize();
//return a shard size equal to our max
- when( nodeShardApproximation
- .getCount( scope , nodeId, NodeType.SOURCE, 0l,type , subType ))
+ when( nodeShardApproximation.getCount( scope, nodeId, NodeType.SOURCE, 0l, type, subType ) )
.thenReturn( shardCount );
ArgumentCaptor<Long> shardValue = ArgumentCaptor.forClass( Long.class );
ArgumentCaptor<Long> timestampValue = ArgumentCaptor.forClass( Long.class );
-
-
//mock up our mutation
when( edgeShardSerialization
- .writeEdgeMeta( same( scope ), same( nodeId ), eq(NodeType.SOURCE), shardValue.capture(), timestampValue.capture(), same( type ), same( subType ) ) )
+ .writeEdgeMeta( same( scope ), same( nodeId ), eq( NodeType.SOURCE ), shardValue.capture(),
+ timestampValue.capture(), same( type ), same( subType ) ) )
.thenReturn( mock( MutationBatch.class ) );
- final SimpleMarkedEdge returnedEdge = new SimpleMarkedEdge( nodeId, type, createId("subType"), 10005l, false );
- final Iterator<MarkedEdge> edgeIterator = Collections.singleton( (MarkedEdge)returnedEdge ).iterator();
+ final SimpleMarkedEdge returnedEdge =
+ new SimpleMarkedEdge( nodeId, type, createId( "subType" ), 10005l, false );
+ final Iterator<MarkedEdge> edgeIterator = Collections.singleton( ( MarkedEdge ) returnedEdge ).iterator();
//mock up returning the value
- when( shardedEdgeSerialization.getEdgesFromSourceByTargetType( same( edgeColumnFamilies ), same( scope ),
- any( SearchByIdType.class ), any(Iterator.class) )).thenReturn( edgeIterator );
+ when( shardedEdgeSerialization
+ .getEdgesFromSourceByTargetType( same( edgeColumnFamilies ), same( scope ), any( SearchByIdType.class ),
+ any( Iterator.class ) ) ).thenReturn( edgeIterator );
-
- final boolean result = approximation.auditMaxShard( scope, nodeId, NodeType.SOURCE, type, subType );
+ final boolean result = approximation.auditMaxShard( scope, nodeId, NodeType.SOURCE, type, subType );
assertTrue( "Shard allocated", result );
@@ -350,41 +346,37 @@ public class NodeShardAllocationTest {
assertEquals( "Expected time service time", timeservicetime, savedTimestamp );
-
//now check our max value was set
final long savedShardPivot = shardValue.getValue();
- assertEquals("Expected max value to be the same", returnedEdge.getTimestamp(), savedShardPivot);
+ assertEquals( "Expected max value to be the same", returnedEdge.getTimestamp(), savedShardPivot );
}
-
-
@Test
public void futureCountShardCleanup() {
final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
- final EdgeColumnFamilies edgeColumnFamilies = mock(EdgeColumnFamilies.class);
+ final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
- final ShardedEdgeSerialization shardedEdgeSerialization = mock(ShardedEdgeSerialization.class);
+ final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
- final NodeShardApproximation nodeShardApproximation =
- mock( NodeShardApproximation.class );
+ final NodeShardApproximation nodeShardApproximation = mock( NodeShardApproximation.class );
final TimeService timeService = mock( TimeService.class );
final Keyspace keyspace = mock( Keyspace.class );
- final MutationBatch batch = mock(MutationBatch.class);
+ final MutationBatch batch = mock( MutationBatch.class );
- when(keyspace.prepareMutationBatch()).thenReturn( batch );
+ when( keyspace.prepareMutationBatch() ).thenReturn( batch );
NodeShardAllocation approximation =
- new NodeShardAllocationImpl(edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization, nodeShardApproximation, timeService,
- graphFig, keyspace );
+ new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
+ nodeShardApproximation, timeService, graphFig, keyspace );
final Id nodeId = createId( "test" );
final String type = "type";
@@ -392,100 +384,132 @@ public class NodeShardAllocationTest {
/**
- * Use the time service to generate UUIDS
+ * Use the time service to generate timestamps
*/
- final long timeservicetime = System.currentTimeMillis();
+ final long timeservicetime = 10000;
when( timeService.getCurrentTime() ).thenReturn( timeservicetime );
- assertTrue("Shard cache mocked", graphFig.getShardCacheTimeout() > 0);
+ assertTrue( "Shard cache mocked", graphFig.getShardCacheTimeout() > 0 );
/**
* Simulates clock drift when 2 nodes create future shards near one another
*/
- final long futureTime = timeService.getCurrentTime() + 2 * graphFig.getShardCacheTimeout();
+ final long minDelta = graphFig.getShardMinDelta();
+
+ final Shard minShard = new Shard( 0l, 0l );
- final Shard minShard = new Shard(0l, 0l);
+ //a shard that isn't our minimum, but exists after compaction
+ final Shard compactedShard = new Shard( 5000, 1000 );
/**
- * Simulate slow node
+ * Simulate different node time allocation
*/
- //our second shard is the "oldest", and hence should be returned in the iterator. Future shard 1 and 3 should be removed
- final Shard futureShard1 = new Shard(futureTime - 1, timeservicetime+1000);
+ final long minTime = 10000;
+ //our second shard is the "oldest", and hence should be returned in the iterator. Future shard 1 and 3
+ // should be removed
+
+ //this should get dropped, It's allocated after future shard2 even though the time is less
+ final Shard futureShard1 = new Shard( 10000, minTime + minDelta );
- final Shard futureShard2 = new Shard(futureTime + 10000, timeservicetime);
+ //should get kept.
+ final Shard futureShard2 = new Shard( 10005, minTime );
- final Shard futureShard3 = new Shard(futureShard2.getShardIndex() + 10000, timeservicetime+2000);
+ //should be removed
+ final Shard futureShard3 = new Shard( 10010, minTime + minDelta / 2 );
/**
* Mock up returning a min shard
*/
when( edgeShardSerialization
- .getEdgeMetaData( same( scope ), same( nodeId ), eq(NodeType.TARGET), any( Optional.class ), same( type ),
- same( subType ) ) ).thenReturn(
- Arrays.asList( futureShard3, futureShard2, futureShard1, minShard ).iterator() );
-
+ .getEdgeMetaData( same( scope ), same( nodeId ), eq( NodeType.TARGET ), any( Optional.class ),
+ same( type ), same( subType ) ) ).thenReturn(
+ Arrays.asList( futureShard3, futureShard2, futureShard1, compactedShard, minShard ).iterator() );
ArgumentCaptor<Long> newLongValue = ArgumentCaptor.forClass( Long.class );
-
-
//mock up our mutation
when( edgeShardSerialization
- .removeEdgeMeta( same( scope ), same( nodeId ), eq(NodeType.TARGET), newLongValue.capture(), same( type ), same( subType ) ) )
- .thenReturn( mock( MutationBatch.class ) );
+ .removeEdgeMeta( same( scope ), same( nodeId ), eq( NodeType.TARGET ), newLongValue.capture(),
+ same( type ), same( subType ) ) ).thenReturn( mock( MutationBatch.class ) );
- final Iterator<Shard>
- result = approximation.getShards( scope, nodeId, NodeType.TARGET, Optional.<Shard>absent(), type, subType );
+ final Iterator<ShardEntryGroup> result =
+ approximation.getShards( scope, nodeId, NodeType.TARGET, Optional.<Shard>absent(), type, subType );
assertTrue( "Shards present", result.hasNext() );
- assertEquals("Only single next shard returned", futureShard2, result.next());
- assertTrue("Shards present", result.hasNext());
+ ShardEntryGroup shardEntryGroup = result.next();
- assertEquals("Previous shard present", 0l, result.next().getShardIndex());
+ assertEquals( "Future shard returned", futureShard1, shardEntryGroup.getMergeTarget() );
- assertFalse("No shards left", result.hasNext());
- /**
- * Now we need to verify that both our mutations have been added
- */
+ //now verify all 4 are in this group. This is because the first shard (0,0) (n-1_ may be the only shard other
+ //nodes see while we're rolling our state. This means it should be read and merged from as well
- List<Long> values = newLongValue.getAllValues();
+ Collection<Shard> writeShards = shardEntryGroup.getWriteShards();
- assertEquals("2 values removed", 2, values.size());
+ assertEquals( "Shard size as expected", 4, writeShards.size() );
- assertEquals("Deleted Max Future", futureShard1.getShardIndex(), values.get( 0 ).longValue());
- assertEquals("Deleted Next Future", futureShard3.getShardIndex(), values.get( 1 ).longValue());
+ assertTrue( writeShards.contains( futureShard1 ) );
+ assertTrue( writeShards.contains( futureShard2 ) );
+ assertTrue( writeShards.contains( futureShard3 ) );
+ assertTrue( writeShards.contains( compactedShard ) );
- }
+
+ Collection<Shard> readShards = shardEntryGroup.getReadShards( minTime + minDelta );
+
+ assertEquals( "Shard size as expected", 4, readShards.size() );
+
+ assertTrue( readShards.contains( futureShard1 ) );
+ assertTrue( readShards.contains( futureShard2 ) );
+ assertTrue( readShards.contains( futureShard3 ) );
+ assertTrue( readShards.contains( compactedShard ) );
+
+
+ assertTrue( "Shards present", result.hasNext() );
+
+ shardEntryGroup = result.next();
+
+ writeShards = shardEntryGroup.getWriteShards();
+
+
+ assertTrue( "Previous shard present", writeShards.contains( minShard ) );
+
+
+ writeShards = shardEntryGroup.getReadShards( minTime + minDelta );
+ assertTrue( "Previous shard present", writeShards.contains( minShard ) );
+
+
+ assertFalse( "No shards left", result.hasNext() );
+ }
@Test
public void noShardsReturns() {
final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
- final EdgeColumnFamilies edgeColumnFamilies = mock(EdgeColumnFamilies.class);
+ final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
- final ShardedEdgeSerialization shardedEdgeSerialization = mock(ShardedEdgeSerialization.class);
+ final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
- final NodeShardApproximation nodeShardApproximation =
- mock( NodeShardApproximation.class );
+ final NodeShardApproximation nodeShardApproximation = mock( NodeShardApproximation.class );
final TimeService timeService = mock( TimeService.class );
+ when( timeService.getCurrentTime() ).thenReturn( 10000l );
+
final Keyspace keyspace = mock( Keyspace.class );
final MutationBatch batch = mock( MutationBatch.class );
@@ -493,8 +517,8 @@ public class NodeShardAllocationTest {
when( keyspace.prepareMutationBatch() ).thenReturn( batch );
NodeShardAllocation approximation =
- new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization, nodeShardApproximation, timeService,
- graphFig, keyspace );
+ new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
+ nodeShardApproximation, timeService, graphFig, keyspace );
final Id nodeId = createId( "test" );
final String type = "type";
@@ -504,15 +528,114 @@ public class NodeShardAllocationTest {
* Mock up returning an empty iterator, our audit shouldn't create a new shard
*/
when( edgeShardSerialization
- .getEdgeMetaData( same( scope ), same( nodeId ), eq(NodeType.TARGET), any( Optional.class ), same( type ),
- same( subType ) ) ).thenReturn( Collections.<Shard>emptyList().iterator() );
+ .getEdgeMetaData( same( scope ), same( nodeId ), eq( NodeType.TARGET ), any( Optional.class ),
+ same( type ), same( subType ) ) ).thenReturn( Collections.<Shard>emptyList().iterator() );
+
+ final Iterator<ShardEntryGroup> result =
+ approximation.getShards( scope, nodeId, NodeType.TARGET, Optional.<Shard>absent(), type, subType );
+
+
+ ShardEntryGroup shardEntryGroup = result.next();
+
+ final Shard expected = new Shard( 0, 0 );
+
+ assertEquals( "Future shard returned", expected, shardEntryGroup.getMergeTarget() );
+
+
+ //now verify all 4 are in this group. This is because the first shard (0,0) (n-1_ may be the only shard other
+ //nodes see while we're rolling our state. This means it should be read and merged from as well
+
+ Collection<Shard> writeShards = shardEntryGroup.getWriteShards();
- final Iterator<Shard> result = approximation.getShards( scope, nodeId, NodeType.TARGET, Optional.<Shard>absent(), type,
- subType );
+ Collection<Shard> readShards = shardEntryGroup.getReadShards( 10000l );
+
+
+ assertTrue( "0 shard allocated", writeShards.contains( expected ) );
+
+ assertTrue( "0 shard allocated", readShards.contains( expected ) );
- assertEquals("0 shard allocated", 0l, result.next().getShardIndex());
assertFalse( "No shard allocated", result.hasNext() );
}
+
+ @Test
+ public void invalidConfiguration() {
+
+ final GraphFig graphFig = mock( GraphFig.class );
+
+ final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
+
+ final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
+
+ final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
+
+ final NodeShardApproximation nodeShardApproximation = mock( NodeShardApproximation.class );
+
+
+ /**
+ * Return 100000 milliseconds
+ */
+ final TimeService timeService = mock( TimeService.class );
+
+ final long time = 100000l;
+
+ when( timeService.getCurrentTime() ).thenReturn( time );
+
+
+ final long cacheTimeout = 30000l;
+
+ when( graphFig.getShardCacheTimeout() ).thenReturn( 30000l );
+
+
+ final long tooSmallDelta = ( long ) ( ( cacheTimeout * 2 ) * .99 );
+
+ when( graphFig.getShardMinDelta() ).thenReturn( tooSmallDelta );
+
+
+ final Keyspace keyspace = mock( Keyspace.class );
+
+ final MutationBatch batch = mock( MutationBatch.class );
+
+ when( keyspace.prepareMutationBatch() ).thenReturn( batch );
+
+
+ NodeShardAllocation approximation =
+ new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
+ nodeShardApproximation, timeService, graphFig, keyspace );
+
+
+ /**
+ * Should throw an exception
+ */
+ try {
+ approximation.getMinTime();
+ fail( "Should have thrown a GraphRuntimeException" );
+ }
+ catch ( GraphRuntimeException gre ) {
+ //swallow
+ }
+
+ //now test something that passes.
+
+ final long minDelta = cacheTimeout * 2;
+
+ when( graphFig.getShardMinDelta() ).thenReturn( minDelta );
+
+ long returned = approximation.getMinTime();
+
+ long expectedReturned = time - minDelta;
+
+ assertEquals( expectedReturned, returned );
+
+ final long delta = cacheTimeout * 4;
+
+ when( graphFig.getShardMinDelta() ).thenReturn( delta );
+
+ returned = approximation.getMinTime();
+
+ expectedReturned = time - delta;
+
+ assertEquals( expectedReturned, returned );
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ecd603f/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroupTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroupTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroupTest.java
new file mode 100644
index 0000000..4e23d83
--- /dev/null
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroupTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.graph.serialization.impl.shard;
+
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * Test for the group functionality
+ */
+public class ShardEntryGroupTest {
+
+ @Test
+ public void singleEntry() {
+
+ final long delta = 10000;
+
+ Shard rootShard = new Shard( 0, 0 );
+
+ ShardEntryGroup shardEntryGroup = new ShardEntryGroup( delta );
+
+ final boolean result = shardEntryGroup.addShard( rootShard );
+
+ assertTrue( "Shard added", result );
+
+ assertFalse( "Single shard cannot be deleted", shardEntryGroup.canBeDeleted( rootShard ) );
+
+ assertSame( "Same shard for merge target", rootShard, shardEntryGroup.getMergeTarget() );
+
+ assertFalse( "Merge cannot be run with a single shard", shardEntryGroup.needsCompaction( 0 ) );
+ }
+
+
+ @Test
+ public void allocatedWithinDelta() {
+
+ final long delta = 10000;
+
+ Shard firstShard = new Shard( 1000, 1000 );
+
+ Shard secondShard = new Shard( 1000, 1000 );
+
+
+ ShardEntryGroup shardEntryGroup = new ShardEntryGroup( delta );
+
+ final boolean result = shardEntryGroup.addShard( rootShard );
+
+ assertTrue( "Shard added", result );
+
+ assertFalse( "Single shard cannot be deleted", shardEntryGroup.canBeDeleted( rootShard ) );
+
+ assertSame( "Same shard for merge target", rootShard, shardEntryGroup.getMergeTarget() );
+
+ assertFalse( "Merge cannot be run with a single shard", shardEntryGroup.needsCompaction( 0 ) );
+ }
+}
+
+
+
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3ecd603f/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java
index 51448b7..2879d5b 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java
@@ -454,6 +454,12 @@ public class NodeShardApproximationTest {
@Override
+ public long getShardMinDelta() {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+
+ @Override
public long getShardCacheSize() {
return 0; //To change body of implemented methods use File | Settings | File Templates.
}