You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2014/08/14 17:20:21 UTC
[2/6] Implemented New rolling shard algorithm. This should allow
eventual shard consistency between all clients without the need for an
external locking allocation system
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9d652dd/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
index 5c846f1..4130771 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
@@ -20,19 +20,23 @@ package org.apache.usergrid.persistence.graph.serialization.impl.shard;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
-import java.util.List;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
+import org.apache.cassandra.thrift.Mutation;
+
import org.apache.usergrid.persistence.core.consistency.TimeService;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.GraphFig;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.count.NodeShardCounterSerialization;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.count.ShardKey;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.SearchByIdType;
+import org.apache.usergrid.persistence.graph.exception.GraphRuntimeException;
+import org.apache.usergrid.persistence.graph.impl.SimpleMarkedEdge;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.NodeShardAllocationImpl;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
@@ -40,11 +44,13 @@ import org.apache.usergrid.persistence.model.util.UUIDGenerator;
import com.google.common.base.Optional;
import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import static junit.framework.TestCase.assertTrue;
import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createId;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.same;
@@ -76,53 +82,100 @@ public class NodeShardAllocationTest {
when( graphFig.getShardCacheSize() ).thenReturn( 10000l );
when( graphFig.getShardSize() ).thenReturn( 20000l );
- when( graphFig.getShardCacheTimeout()).thenReturn( 30000l );
+
+ final long timeout = 30000;
+ when( graphFig.getShardCacheTimeout() ).thenReturn( timeout );
+ when( graphFig.getShardMinDelta() ).thenReturn( timeout * 2 );
}
@Test
- public void noShards() {
+ public void minTime() {
final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
- final NodeShardApproximation nodeShardCounterSerialization =
- mock( NodeShardApproximation.class );
+ final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
+
+ final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
+
+ final NodeShardApproximation nodeShardCounterSerialization = mock( NodeShardApproximation.class );
final TimeService timeService = mock( TimeService.class );
final Keyspace keyspace = mock( Keyspace.class );
- final MutationBatch batch = mock( MutationBatch.class );
-
- when( keyspace.prepareMutationBatch() ).thenReturn( batch );
NodeShardAllocation approximation =
- new NodeShardAllocationImpl( edgeShardSerialization, nodeShardCounterSerialization, timeService,
- graphFig );
+ new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
+ nodeShardCounterSerialization, timeService, graphFig, keyspace );
- final Id nodeId = createId( "test" );
- final String type = "type";
- final String subType = "subType";
- /**
- * Mock up returning an empty iterator, our audit shouldn't create a new shard
- */
- when( edgeShardSerialization
- .getEdgeMetaData( same( scope ), same( nodeId ), any( Optional.class ), same( type ),
- same( subType ) ) ).thenReturn( Collections.<Long>emptyList().iterator() );
+ final long timeservicetime = System.currentTimeMillis();
- final boolean result = approximation.auditMaxShard( scope, nodeId, type, subType );
+ when( timeService.getCurrentTime() ).thenReturn( timeservicetime );
- assertFalse( "No shard allocated", result );
+ final long expected = timeservicetime - 2 * graphFig.getShardCacheTimeout();
+
+ final long returned = approximation.getMinTime();
+
+ assertEquals( "Correct time was returned", expected, returned );
}
+ // @Test
+ // public void noShards() {
+ // final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
+ //
+ // final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
+ //
+ // final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
+ //
+ //
+ // final NodeShardApproximation nodeShardCounterSerialization = mock( NodeShardApproximation.class );
+ //
+ //
+ // final TimeService timeService = mock( TimeService.class );
+ //
+ // final Keyspace keyspace = mock( Keyspace.class );
+ //
+ // final MutationBatch batch = mock( MutationBatch.class );
+ //
+ // when( keyspace.prepareMutationBatch() ).thenReturn( batch );
+ //
+ // NodeShardAllocation approximation =
+ // new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
+ // nodeShardCounterSerialization, timeService, graphFig, keyspace );
+ //
+ // final Id nodeId = createId( "test" );
+ // final String type = "type";
+ // final String subType = "subType";
+ //
+ // /**
+ // * Mock up returning an empty iterator, our audit shouldn't create a new shard
+ // */
+ //
+ // final DirectedEdgeMeta targetEdgeMeta = DirectedEdgeMeta.fromSourceNodeTargetType(nodeId, type, subType );
+ //
+ //
+ // when( edgeShardSerialization
+ // .getShardMetaData( same( scope ), any( Optional.class ), same(targetEdgeMeta)) ).thenReturn(
+ // Collections.<Shard>emptyList().iterator() );
+ //
+ // final boolean result = approximation.auditShard(scope, null, targetEdgeMeta);
+ //
+ // assertFalse( "No shard allocated", result );
+ // }
+
+
@Test
- public void existingFutureShard() {
+ public void existingFutureShardSameTime() {
final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
- final NodeShardApproximation nodeShardCounterSerialization =
- mock( NodeShardApproximation.class );
+ final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
+
+ final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
+
+ final NodeShardApproximation nodeShardCounterSerialization = mock( NodeShardApproximation.class );
final TimeService timeService = mock( TimeService.class );
@@ -136,8 +189,8 @@ public class NodeShardAllocationTest {
NodeShardAllocation approximation =
- new NodeShardAllocationImpl( edgeShardSerialization, nodeShardCounterSerialization, timeService,
- graphFig );
+ new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
+ nodeShardCounterSerialization, timeService, graphFig, keyspace );
final Id nodeId = createId( "test" );
final String type = "type";
@@ -148,16 +201,21 @@ public class NodeShardAllocationTest {
when( timeService.getCurrentTime() ).thenReturn( timeservicetime );
- final long futureShard = timeservicetime + graphFig.getShardCacheTimeout() * 2 ;
+ final Shard futureShard = new Shard( 10000l, timeservicetime, true );
+
+ final ShardEntryGroup shardEntryGroup = new ShardEntryGroup( 1000l );
+ shardEntryGroup.addShard( futureShard );
+
+ final DirectedEdgeMeta targetEdgeMeta = DirectedEdgeMeta.fromSourceNodeTargetType( nodeId, type, subType );
/**
* Mock up returning a min shard, and a future shard
*/
- when( edgeShardSerialization
- .getEdgeMetaData( same( scope ), same( nodeId ), any( Optional.class ), same( type ),
- same( subType ) ) ).thenReturn( Arrays.asList( futureShard ).iterator() );
+ when( edgeShardSerialization.getShardMetaData( same( scope ), any( Optional.class ), same( targetEdgeMeta ) ) )
+ .thenReturn( Arrays.asList( futureShard ).iterator() );
- final boolean result = approximation.auditMaxShard( scope, nodeId, type, subType );
+
+ final boolean result = approximation.auditShard( scope, shardEntryGroup, targetEdgeMeta );
assertFalse( "No shard allocated", result );
}
@@ -167,8 +225,11 @@ public class NodeShardAllocationTest {
public void lowCountFutureShard() {
final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
- final NodeShardApproximation nodeShardApproximation =
- mock( NodeShardApproximation.class );
+ final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
+
+ final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
+
+ final NodeShardApproximation nodeShardApproximation = mock( NodeShardApproximation.class );
final TimeService timeService = mock( TimeService.class );
@@ -181,8 +242,8 @@ public class NodeShardAllocationTest {
NodeShardAllocation approximation =
- new NodeShardAllocationImpl( edgeShardSerialization, nodeShardApproximation, timeService,
- graphFig );
+ new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
+ nodeShardApproximation, timeService, graphFig, keyspace );
final Id nodeId = createId( "test" );
final String type = "type";
@@ -193,23 +254,32 @@ public class NodeShardAllocationTest {
when( timeService.getCurrentTime() ).thenReturn( timeservicetime );
+ final Shard futureShard = new Shard( 10000l, timeservicetime, true );
+
+ final ShardEntryGroup shardEntryGroup = new ShardEntryGroup( 1000l );
+ shardEntryGroup.addShard( futureShard );
+
+ final DirectedEdgeMeta targetEdgeMeta = DirectedEdgeMeta.fromSourceNodeTargetType( nodeId, type, subType );
+
+
+
/**
* Mock up returning a min shard, and a future shard
*/
when( edgeShardSerialization
- .getEdgeMetaData( same( scope ), same( nodeId ), any( Optional.class ), same( type ),
- same( subType ) ) ).thenReturn( Arrays.asList( 0l ).iterator() );
+ .getShardMetaData( same( scope ), any( Optional.class ), same(targetEdgeMeta)) )
+ .thenReturn( Arrays.asList( new Shard( 0l, 0l, true ) ).iterator() );
//return a shard size < our max by 1
final long count = graphFig.getShardSize() - 1;
- when( nodeShardApproximation.getCount(scope, nodeId, 0l, type, subType ))
- .thenReturn( count );
+ when( nodeShardApproximation.getCount( scope, futureShard, targetEdgeMeta ) )
+ .thenReturn( count );
- final boolean result = approximation.auditMaxShard( scope, nodeId, type, subType );
+ final boolean result = approximation.auditShard( scope, shardEntryGroup, targetEdgeMeta);
assertFalse( "Shard allocated", result );
}
@@ -219,22 +289,25 @@ public class NodeShardAllocationTest {
public void equalCountFutureShard() {
final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
- final NodeShardApproximation nodeShardApproximation =
- mock( NodeShardApproximation.class );
+ final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
+
+ final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
+
+ final NodeShardApproximation nodeShardApproximation = mock( NodeShardApproximation.class );
final TimeService timeService = mock( TimeService.class );
final Keyspace keyspace = mock( Keyspace.class );
- final MutationBatch batch = mock(MutationBatch.class);
+ final MutationBatch batch = mock( MutationBatch.class );
- when(keyspace.prepareMutationBatch()).thenReturn( batch );
+ when( keyspace.prepareMutationBatch() ).thenReturn( batch );
NodeShardAllocation approximation =
- new NodeShardAllocationImpl( edgeShardSerialization, nodeShardApproximation, timeService,
- graphFig );
+ new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
+ nodeShardApproximation, timeService, graphFig, keyspace );
final Id nodeId = createId( "test" );
final String type = "type";
@@ -245,69 +318,92 @@ public class NodeShardAllocationTest {
when( timeService.getCurrentTime() ).thenReturn( timeservicetime );
+ final Shard futureShard = new Shard( 0l, 0l, true );
+
+ final ShardEntryGroup shardEntryGroup = new ShardEntryGroup( 1000l );
+ shardEntryGroup.addShard( futureShard );
+
+ final DirectedEdgeMeta targetEdgeMeta = DirectedEdgeMeta.fromSourceNodeTargetType( nodeId, type, subType );
+
+
/**
* Mock up returning a min shard
*/
when( edgeShardSerialization
- .getEdgeMetaData( same( scope ), same( nodeId ), any( Optional.class ), same( type ),
- same( subType ) ) ).thenReturn( Arrays.asList( 0l ).iterator() );
+ .getShardMetaData( same( scope ), any( Optional.class ), same(targetEdgeMeta) ) )
+ .thenReturn( Arrays.asList( futureShard ).iterator() );
final long shardCount = graphFig.getShardSize();
//return a shard size equal to our max
- when( nodeShardApproximation
- .getCount( scope , nodeId, 0l,type , subType ))
+ when( nodeShardApproximation.getCount( scope, futureShard, targetEdgeMeta) )
.thenReturn( shardCount );
- ArgumentCaptor<Long> newUUIDValue = ArgumentCaptor.forClass( Long.class );
+ ArgumentCaptor<Shard> shardValue = ArgumentCaptor.forClass( Shard.class );
//mock up our mutation
when( edgeShardSerialization
- .writeEdgeMeta( same( scope ), same( nodeId ), newUUIDValue.capture(), same( type ), same( subType ) ) )
- .thenReturn( mock( MutationBatch.class ) );
+ .writeShardMeta( same( scope ), shardValue.capture(), same(targetEdgeMeta) ) ).thenReturn( mock( MutationBatch.class ) );
- final boolean result = approximation.auditMaxShard( scope, nodeId, type, subType );
+ final SimpleMarkedEdge returnedEdge =
+ new SimpleMarkedEdge( nodeId, type, createId( "subType" ), 10005l, false );
+
+ final Iterator<MarkedEdge> edgeIterator = Collections.singleton( ( MarkedEdge ) returnedEdge ).iterator();
+
+ //mock up returning the value
+ when( shardedEdgeSerialization
+ .getEdgesFromSourceByTargetType( same( edgeColumnFamilies ), same( scope ), any( SearchByIdType.class ),
+ any( Iterator.class ) ) ).thenReturn( edgeIterator );
+
+
+ final boolean result = approximation.auditShard( scope, shardEntryGroup, targetEdgeMeta);
assertTrue( "Shard allocated", result );
//check our new allocated UUID
- final long expectedTime = timeservicetime + 2 * graphFig.getShardCacheTimeout();
- final long savedTimestamp = newUUIDValue.getValue();
+ final long savedTimestamp = shardValue.getValue().getCreatedTime();
+ assertEquals( "Expected time service time", timeservicetime, savedTimestamp );
- assertEquals( "Expected at 2x timeout generated", expectedTime, savedTimestamp );
- }
+ //now check our max value was set
+ final long savedShardPivot = shardValue.getValue().getShardIndex();
+
+ assertEquals( "Expected max value to be the same", returnedEdge.getTimestamp(), savedShardPivot );
+ }
@Test
public void futureCountShardCleanup() {
final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
- final NodeShardApproximation nodeShardApproximation =
- mock( NodeShardApproximation.class );
+ final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
+
+ final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
+
+ final NodeShardApproximation nodeShardApproximation = mock( NodeShardApproximation.class );
final TimeService timeService = mock( TimeService.class );
final Keyspace keyspace = mock( Keyspace.class );
- final MutationBatch batch = mock(MutationBatch.class);
+ final MutationBatch batch = mock( MutationBatch.class );
- when(keyspace.prepareMutationBatch()).thenReturn( batch );
+ when( keyspace.prepareMutationBatch() ).thenReturn( batch );
NodeShardAllocation approximation =
- new NodeShardAllocationImpl( edgeShardSerialization, nodeShardApproximation, timeService,
- graphFig );
+ new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
+ nodeShardApproximation, timeService, graphFig, keyspace );
final Id nodeId = createId( "test" );
final String type = "type";
@@ -315,94 +411,133 @@ public class NodeShardAllocationTest {
/**
- * Use the time service to generate UUIDS
+ * Use the time service to generate timestamps
*/
- final long timeservicetime = System.currentTimeMillis();
+ final long timeservicetime = 10000;
when( timeService.getCurrentTime() ).thenReturn( timeservicetime );
- assertTrue("Shard cache mocked", graphFig.getShardCacheTimeout() > 0);
+ assertTrue( "Shard cache mocked", graphFig.getShardCacheTimeout() > 0 );
/**
* Simulates clock drift when 2 nodes create future shards near one another
*/
- final long futureTime = timeService.getCurrentTime() + 2 * graphFig.getShardCacheTimeout();
+ final long minDelta = graphFig.getShardMinDelta();
+
+
+ final Shard minShard = new Shard( 0l, 0l, true );
+ //a shard that isn't our minimum, but exists after compaction
+ final Shard compactedShard = new Shard( 5000, 1000, true );
/**
- * Simulate slow node
+ * Simulate different node time allocation
*/
- final long futureShard1 = futureTime - 1;
- final long futureShard2 = futureTime + 10000;
+ final long minTime = 10000;
+ //our second shard is the "oldest", and hence should be returned in the iterator. Future shard 1 and 3
+ // should be removed
- final long futureShard3 = futureShard2 + 10000;
+ //this should get dropped, It's allocated after future shard2 even though the time is less
+ final Shard futureShard1 = new Shard( 10000, minTime + minDelta, false );
+ //should get kept.
+ final Shard futureShard2 = new Shard( 10005, minTime, false );
- final int pageSize = 100;
+ //should be removed
+ final Shard futureShard3 = new Shard( 10010, minTime + minDelta / 2, false );
+
+ final DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromTargetNodeSourceType( nodeId, type, subType );
/**
* Mock up returning a min shard
*/
when( edgeShardSerialization
- .getEdgeMetaData( same( scope ), same( nodeId ), any( Optional.class ), same( type ),
- same( subType ) ) ).thenReturn( Arrays.asList(futureShard3, futureShard2, futureShard1, 0l).iterator() );
+ .getShardMetaData( same( scope ), any( Optional.class ), same(directedEdgeMeta)) ).thenReturn(
+ Arrays.asList( futureShard3, futureShard2, futureShard1, compactedShard, minShard ).iterator() );
+ ArgumentCaptor<Shard> newLongValue = ArgumentCaptor.forClass( Shard.class );
- ArgumentCaptor<Long> newLongValue = ArgumentCaptor.forClass( Long.class );
+ //mock up our mutation
+ when( edgeShardSerialization
+ .removeShardMeta( same( scope ), newLongValue.capture(), same(directedEdgeMeta)) ).thenReturn( mock( MutationBatch.class ) );
+ final Iterator<ShardEntryGroup> result =
+ approximation.getShards( scope, Optional.<Shard>absent(), directedEdgeMeta);
- //mock up our mutation
- when( edgeShardSerialization
- .removeEdgeMeta( same( scope ), same( nodeId ), newLongValue.capture(), same( type ), same( subType ) ) )
- .thenReturn( mock( MutationBatch.class ) );
+ assertTrue( "Shards present", result.hasNext() );
+
+
+ ShardEntryGroup shardEntryGroup = result.next();
+
+ assertEquals( "Future shard returned", futureShard1, shardEntryGroup.getCompactionTarget() );
- final Iterator<Long>
- result = approximation.getShards( scope, nodeId, Optional.<Long>absent(), type, subType );
+
+ //now verify all 4 are in this group. This is because the first shard (0,0) (n-1_ may be the only shard other
+ //nodes see while we're rolling our state. This means it should be read and merged from as well
+
+ Collection<Shard> writeShards = shardEntryGroup.getWriteShards( minTime + minDelta );
+
+ assertEquals( "Shard size as expected", 4, writeShards.size() );
+
+ assertTrue( writeShards.contains( futureShard1 ) );
+ assertTrue( writeShards.contains( futureShard2 ) );
+ assertTrue( writeShards.contains( futureShard3 ) );
+ assertTrue( writeShards.contains( compactedShard ) );
+
+
+ Collection<Shard> readShards = shardEntryGroup.getReadShards();
+
+ assertEquals( "Shard size as expected", 4, readShards.size() );
+
+ assertTrue( readShards.contains( futureShard1 ) );
+ assertTrue( readShards.contains( futureShard2 ) );
+ assertTrue( readShards.contains( futureShard3 ) );
+ assertTrue( readShards.contains( compactedShard ) );
assertTrue( "Shards present", result.hasNext() );
- assertEquals("Only single next shard returned", futureShard1, result.next().longValue());
+ shardEntryGroup = result.next();
- assertTrue("Shards present", result.hasNext());
- assertEquals("Previous shard present", 0l, result.next().longValue());
+ writeShards = shardEntryGroup.getWriteShards( minTime + minDelta );
- assertFalse("No shards left", result.hasNext());
- /**
- * Now we need to verify that both our mutations have been added
- */
+ assertTrue( "Previous shard present", writeShards.contains( minShard ) );
- List<Long> values = newLongValue.getAllValues();
- assertEquals("2 values removed", 2, values.size());
+ writeShards = shardEntryGroup.getReadShards();
- assertEquals("Deleted Max Future", futureShard3, values.get( 0 ).longValue());
- assertEquals("Deleted Next Future", futureShard2, values.get( 1 ).longValue());
- }
+ assertTrue( "Previous shard present", writeShards.contains( minShard ) );
+ assertFalse( "No shards left", result.hasNext() );
+ }
@Test
- public void noShardsReturns() {
+ public void noShardsReturns() throws ConnectionException {
final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
- final NodeShardApproximation nodeShardApproximation =
- mock( NodeShardApproximation.class );
+ final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
+
+ final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
+
+ final NodeShardApproximation nodeShardApproximation = mock( NodeShardApproximation.class );
final TimeService timeService = mock( TimeService.class );
+ when( timeService.getCurrentTime() ).thenReturn( 10000l );
+
final Keyspace keyspace = mock( Keyspace.class );
final MutationBatch batch = mock( MutationBatch.class );
@@ -410,25 +545,139 @@ public class NodeShardAllocationTest {
when( keyspace.prepareMutationBatch() ).thenReturn( batch );
NodeShardAllocation approximation =
- new NodeShardAllocationImpl( edgeShardSerialization, nodeShardApproximation, timeService,
- graphFig );
+ new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
+ nodeShardApproximation, timeService, graphFig, keyspace );
final Id nodeId = createId( "test" );
final String type = "type";
final String subType = "subType";
+ final DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromTargetNodeSourceType( nodeId, type, subType );
+
+
+
/**
* Mock up returning an empty iterator, our audit shouldn't create a new shard
*/
when( edgeShardSerialization
- .getEdgeMetaData( same( scope ), same( nodeId ), any( Optional.class ), same( type ),
- same( subType ) ) ).thenReturn( Collections.<Long>emptyList().iterator() );
+ .getShardMetaData( same( scope ), any( Optional.class ), same(directedEdgeMeta) ) ).thenReturn( Collections.<Shard>emptyList().iterator() );
+
+
+ ArgumentCaptor<Shard> shardArgumentCaptor = ArgumentCaptor.forClass( Shard.class );
+
+ when( edgeShardSerialization
+ .writeShardMeta( same( scope ), shardArgumentCaptor.capture(), same(directedEdgeMeta)) ).thenReturn( batch );
+
+
+ final Iterator<ShardEntryGroup> result =
+ approximation.getShards( scope, Optional.<Shard>absent(), directedEdgeMeta );
+
+
+ ShardEntryGroup shardEntryGroup = result.next();
+
+ final Shard rootShard = new Shard( 0, 0, true );
+
+ assertEquals( "Shard size expected", 1, shardEntryGroup.entrySize() );
- final Iterator<Long> result = approximation.getShards( scope, nodeId, Optional.<Long>absent(), type, subType );
- assertEquals("0 shard allocated", 0l, result.next().longValue());
+ //ensure we persisted the new shard.
+ assertEquals( "Root shard was persisted", rootShard, shardArgumentCaptor.getValue() );
- assertFalse( "No shard allocated", result.hasNext() );
+
+ //now verify all 4 are in this group. This is because the first shard (0,0) (n-1_ may be the only shard other
+ //nodes see while we're rolling our state. This means it should be read and merged from as well
+
+ Collection<Shard> writeShards = shardEntryGroup.getWriteShards( timeService.getCurrentTime() );
+
+ Collection<Shard> readShards = shardEntryGroup.getReadShards();
+
+
+ assertTrue( "root shard allocated", writeShards.contains( rootShard ) );
+
+ assertTrue( "root shard allocated", readShards.contains( rootShard ) );
+
+
+ assertFalse( "No other shard group allocated", result.hasNext() );
}
+
+ @Test
+ public void invalidConfiguration() {
+
+ final GraphFig graphFig = mock( GraphFig.class );
+
+ final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
+
+ final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
+
+ final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
+
+ final NodeShardApproximation nodeShardApproximation = mock( NodeShardApproximation.class );
+
+
+ /**
+ * Return 100000 milliseconds
+ */
+ final TimeService timeService = mock( TimeService.class );
+
+ final long time = 100000l;
+
+ when( timeService.getCurrentTime() ).thenReturn( time );
+
+
+ final long cacheTimeout = 30000l;
+
+ when( graphFig.getShardCacheTimeout() ).thenReturn( 30000l );
+
+
+ final long tooSmallDelta = ( long ) ( ( cacheTimeout * 2 ) * .99 );
+
+ when( graphFig.getShardMinDelta() ).thenReturn( tooSmallDelta );
+
+
+ final Keyspace keyspace = mock( Keyspace.class );
+
+ final MutationBatch batch = mock( MutationBatch.class );
+
+ when( keyspace.prepareMutationBatch() ).thenReturn( batch );
+
+
+ NodeShardAllocation approximation =
+ new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
+ nodeShardApproximation, timeService, graphFig, keyspace );
+
+
+ /**
+ * Should throw an exception
+ */
+ try {
+ approximation.getMinTime();
+ fail( "Should have thrown a GraphRuntimeException" );
+ }
+ catch ( GraphRuntimeException gre ) {
+ //swallow
+ }
+
+ //now test something that passes.
+
+ final long minDelta = cacheTimeout * 2;
+
+ when( graphFig.getShardMinDelta() ).thenReturn( minDelta );
+
+ long returned = approximation.getMinTime();
+
+ long expectedReturned = time - minDelta;
+
+ assertEquals( expectedReturned, returned );
+
+ final long delta = cacheTimeout * 4;
+
+ when( graphFig.getShardMinDelta() ).thenReturn( delta );
+
+ returned = approximation.getMinTime();
+
+ expectedReturned = time - delta;
+
+ assertEquals( expectedReturned, returned );
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9d652dd/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCacheTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCacheTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCacheTest.java
index 6c46c32..f00a380 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCacheTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCacheTest.java
@@ -22,15 +22,16 @@ package org.apache.usergrid.persistence.graph.serialization.impl.shard;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
-import java.util.UUID;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.apache.usergrid.persistence.core.consistency.TimeService;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.GraphFig;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.NodeShardCacheImpl;
-import org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
@@ -39,6 +40,9 @@ import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createId;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.same;
@@ -76,6 +80,8 @@ public class NodeShardCacheTest {
final NodeShardAllocation allocation = mock( NodeShardAllocation.class );
+ final TimeService time = mock(TimeService.class);
+
final Id id = createId( "test" );
final String edgeType = "edge";
@@ -86,78 +92,67 @@ public class NodeShardCacheTest {
final long newTime = 10000l;
- NodeShardCache cache = new NodeShardCacheImpl( allocation, graphFig );
+ NodeShardCache cache = new NodeShardCacheImpl( allocation, graphFig, time );
final Optional max = Optional.absent();
- /**
- * Simulate returning no shards at all.
- */
- when( allocation
- .getShards( same( scope ), same( id ), same( max), same( edgeType ),
- same( otherIdType ) ) )
- .thenReturn( Collections.singletonList( 0l ).iterator() );
-
- long slice = cache.getSlice( scope, id, newTime, edgeType, otherIdType );
+ final ShardEntryGroup group = new ShardEntryGroup( newTime );
+ group.addShard( new Shard(0, 0, true) );
- //we return the min UUID possible, all edges should start by writing to this edge
- assertEquals(0l, slice );
+ DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromSourceNodeTargetType( id, edgeType, otherIdType ) ;
/**
- * Verify that we fired the audit
+ * Simulate returning no shards at all.
*/
- verify( allocation ).auditMaxShard( scope, id, edgeType, otherIdType );
- }
-
-
- @Test
- public void testSingleExistingShard() {
-
- final GraphFig graphFig = getFigMock();
-
- final NodeShardAllocation allocation = mock( NodeShardAllocation.class );
+ when( allocation.getShards( same( scope ), same( max ), same( directedEdgeMeta ) ) )
+ //use "thenAnswer" so we always return the value, even if it's invoked more than 1 time.
+ .thenAnswer( new Answer<Iterator<ShardEntryGroup>>() {
- final Id id = createId( "test" );
+ @Override
+ public Iterator<ShardEntryGroup> answer( final InvocationOnMock invocationOnMock )
+ throws Throwable {
+ return Collections.singletonList( group ).iterator();
+ }
+ });
- final String edgeType = "edge";
- final String otherIdType = "type";
+ ShardEntryGroup returnedGroup = cache.getWriteShardGroup( scope, newTime, directedEdgeMeta );
+ //ensure it's the same group
+ assertSame(group, returnedGroup);
- final long newTime = 10000l;
- final long min = 0;
+ Iterator<ShardEntryGroup>
+ shards = cache.getReadShardGroup( scope, newTime, directedEdgeMeta );
+ assertTrue(shards.hasNext());
- NodeShardCache cache = new NodeShardCacheImpl( allocation, graphFig );
+ returnedGroup = shards.next();
- final Optional max = Optional.absent();
+ assertSame("Single shard group expected", group, returnedGroup);
- /**
- * Simulate returning single shard
- */
- when( allocation.getShards( same( scope ), same( id ), same(max),
- same( edgeType ), same( otherIdType ) ) ).thenReturn( Collections.singletonList( min ).iterator() );
+ assertFalse(shards.hasNext());
- long slice = cache.getSlice( scope, id, newTime, edgeType, otherIdType );
//we return the min UUID possible, all edges should start by writing to this edge
- assertEquals( min, slice );
/**
* Verify that we fired the audit
+ *
+ * TODO, us the GUAVA Tick to make this happen
*/
- verify( allocation ).auditMaxShard( scope, id, edgeType, otherIdType );
+// verify(and allocation ).auditMaxShard( scope, id, edgeType, otherIdType );
}
+
@Test
public void testRangeShard() {
@@ -165,6 +160,8 @@ public class NodeShardCacheTest {
final NodeShardAllocation allocation = mock( NodeShardAllocation.class );
+ final TimeService time = mock(TimeService.class);
+
final Id id = createId( "test" );
final String edgeType = "edge";
@@ -175,150 +172,157 @@ public class NodeShardCacheTest {
/**
* Set our min mid and max
*/
- final long min = 0;
+ NodeShardCache cache = new NodeShardCacheImpl( allocation, graphFig, time );
+
+
+ final Shard minShard = new Shard(0, 0, true);
+ final Shard midShard = new Shard(10000, 1000, true);
+ final Shard maxShard = new Shard(20000, 2000, true);
+
+
+ /**
+ * Simulate returning all shards
+ */
+ final ShardEntryGroup minShardGroup = new ShardEntryGroup( 10000 );
+ minShardGroup.addShard( minShard );
+
+ final ShardEntryGroup midShardGroup = new ShardEntryGroup( 10000 );
+ midShardGroup.addShard( midShard );
- final long mid = 10000;
+ final ShardEntryGroup maxShardGroup = new ShardEntryGroup( 10000 );
+ maxShardGroup.addShard( maxShard );
- final long max = 20000;
- NodeShardCache cache = new NodeShardCacheImpl( allocation, graphFig );
+
+ DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromTargetNodeSourceType( id, edgeType, otherIdType ) ;
/**
- * Simulate returning all shards
+ * Simulate returning no shards at all.
*/
- when( allocation.getShards( same( scope ), same( id ), any( Optional.class ),
- same( edgeType ), same( otherIdType ) ) ).thenReturn( Arrays.asList( min, mid, max ).iterator() );
+ when( allocation
+ .getShards( same( scope ), any( Optional.class ), same( directedEdgeMeta ) ) )
+ //use "thenAnswer" so we always return the value, even if it's invoked more than 1 time.
+ .thenAnswer( new Answer<Iterator<ShardEntryGroup>>(){
- //check getting equal to our min, mid and max
+ @Override
+ public Iterator<ShardEntryGroup> answer( final InvocationOnMock invocationOnMock )
+ throws Throwable {
+ return Arrays.asList( maxShardGroup, midShardGroup, minShardGroup ).iterator();
+ }
+ });
- long slice = cache.getSlice( scope, id, min, edgeType, otherIdType );
+ //check getting equal to our min, mid and max
- //we return the min UUID possible, all edges should start by writing to this edge
- assertEquals( min, slice );
+ ShardEntryGroup writeShard = cache.getWriteShardGroup( scope, minShard.getShardIndex(), directedEdgeMeta );
- slice = cache.getSlice( scope, id, mid,
- edgeType, otherIdType );
+ assertSame(minShardGroup, writeShard);
- //we return the mid UUID possible, all edges should start by writing to this edge
- assertEquals( mid, slice );
+ Iterator<ShardEntryGroup>
+ groups = cache.getReadShardGroup( scope, minShard.getShardIndex(), directedEdgeMeta);
- slice = cache.getSlice( scope, id, max ,
- edgeType, otherIdType );
+ assertTrue(groups.hasNext());
+ assertSame("min shard expected", minShardGroup, groups.next());
- //we return the mid UUID possible, all edges should start by writing to this edge
- assertEquals( max, slice );
+ assertFalse(groups.hasNext());
- //now test in between
- slice = cache.getSlice( scope, id, min+1, edgeType, otherIdType );
- //we return the min UUID possible, all edges should start by writing to this edge
- assertEquals( min, slice );
- slice = cache.getSlice( scope, id, mid-1, edgeType, otherIdType );
+ //mid
+ writeShard = cache.getWriteShardGroup( scope, midShard.getShardIndex(), directedEdgeMeta );
+ assertSame(midShardGroup, writeShard);
- //we return the min UUID possible, all edges should start by writing to this edge
- assertEquals( min, slice );
+ groups = cache.getReadShardGroup( scope, midShard.getShardIndex(), directedEdgeMeta );
- slice = cache.getSlice( scope, id, mid+1, edgeType, otherIdType );
+ assertTrue(groups.hasNext());
+ assertSame("mid shard expected", midShardGroup, groups.next());
- //we return the mid UUID possible, all edges should start by writing to this edge
- assertEquals( mid, slice );
+ assertTrue(groups.hasNext());
- slice = cache.getSlice( scope, id, max-1, edgeType, otherIdType );
+ assertSame("min shard expected", minShardGroup, groups.next());
+ assertFalse(groups.hasNext());
- //we return the mid UUID possible, all edges should start by writing to this edge
- assertEquals( mid, slice );
- slice = cache.getSlice( scope, id, max, edgeType, otherIdType );
+ //max
+ writeShard = cache.getWriteShardGroup( scope, maxShard.getShardIndex(), directedEdgeMeta );
- //we return the mid UUID possible, all edges should start by writing to this edge
- assertEquals( max, slice );
+ assertSame(maxShardGroup, writeShard);
- /**
- * Verify that we fired the audit
- */
- verify( allocation ).auditMaxShard( scope, id, edgeType, otherIdType );
- }
+ groups = cache.getReadShardGroup( scope, maxShard.getShardIndex(), directedEdgeMeta );
- @Test
- public void testRangeShardIterator() {
+ assertTrue(groups.hasNext());
- final GraphFig graphFig = getFigMock();
+ assertSame("max shard expected", maxShardGroup, groups.next());
- final NodeShardAllocation allocation = mock( NodeShardAllocation.class );
+ assertTrue(groups.hasNext());
- final Id id = createId( "test" );
+ assertSame("mid shard expected", midShardGroup, groups.next());
- final String edgeType = "edge";
- final String otherIdType = "type";
+ assertTrue(groups.hasNext());
+ assertSame("min shard expected", minShardGroup, groups.next());
- /**
- * Set our min mid and max
- */
- final long min = 1;
+ assertFalse(groups.hasNext());
- final long mid = 100;
+ //now test at mid +1 to ensure we get mid + min
+ writeShard = cache.getWriteShardGroup( scope, midShard.getShardIndex() + 1, directedEdgeMeta );
- final long max = 200;
+ assertSame(midShardGroup, writeShard);
- NodeShardCache cache = new NodeShardCacheImpl( allocation, graphFig );
+ groups = cache.getReadShardGroup( scope, midShard.getShardIndex() + 1, directedEdgeMeta );
+ assertTrue(groups.hasNext());
- /**
- * Simulate returning all shards
- */
- when( allocation.getShards( same( scope ), same( id ), any(Optional.class),
- same( edgeType ), same( otherIdType ) ) ).thenReturn( Arrays.asList( min, mid, max ).iterator() );
+ assertSame("mid shard expected", midShardGroup, groups.next());
+ assertTrue(groups.hasNext());
- //check getting equal to our min, mid and max
+ assertSame("min shard expected", minShardGroup, groups.next());
- Iterator<Long> slice =
- cache.getVersions( scope, id, max, edgeType, otherIdType );
+ assertFalse(groups.hasNext());
- assertEquals( max, slice.next().longValue() );
- assertEquals( mid, slice.next().longValue() );
- assertEquals( min, slice.next().longValue() );
+ //now test at mid -1 to ensure we get min
+ writeShard = cache.getWriteShardGroup( scope, midShard.getShardIndex() - 1, directedEdgeMeta );
- slice = cache.getVersions( scope, id, mid,
- edgeType, otherIdType );
+ assertSame(minShardGroup, writeShard);
- assertEquals( mid, slice.next().longValue() );
- assertEquals( min, slice.next().longValue() );
+ groups = cache.getReadShardGroup( scope, midShard.getShardIndex() - 1, directedEdgeMeta );
- slice = cache.getVersions( scope, id, min,
- edgeType, otherIdType );
- assertEquals( min, slice.next().longValue() );
+ assertTrue(groups.hasNext());
+ assertSame("min shard expected", minShardGroup, groups.next());
+
+ assertFalse(groups.hasNext());
}
+
+
+
private GraphFig getFigMock() {
final GraphFig graphFig = mock( GraphFig.class );
when( graphFig.getShardCacheSize() ).thenReturn( 1000l );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9d652dd/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroupTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroupTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroupTest.java
new file mode 100644
index 0000000..b08da9a
--- /dev/null
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroupTest.java
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.graph.serialization.impl.shard;
+
+
+import java.util.Collection;
+import java.util.Set;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * Test for the group functionality
+ */
+public class ShardEntryGroupTest {
+
+ @Test
+ public void singleEntry() {
+
+ final long delta = 10000;
+
+ Shard rootShard = new Shard( 0, 0, false );
+
+ ShardEntryGroup shardEntryGroup = new ShardEntryGroup( delta );
+
+ final boolean result = shardEntryGroup.addShard( rootShard );
+
+ assertTrue( "Shard added", result );
+
+ assertFalse( "Single shard cannot be deleted", shardEntryGroup.canBeDeleted( rootShard ) );
+
+ assertNull( "No merge target found", shardEntryGroup.getCompactionTarget() );
+
+ assertFalse( "Merge cannot be run with a single shard", shardEntryGroup.shouldCompact( Long.MAX_VALUE ) );
+ }
+
+
+ @Test
+ public void allocatedWithinDelta() {
+
+ final long delta = 10000;
+
+ Shard firstShard = new Shard( 1000, 1000, false );
+
+ Shard secondShard = new Shard( 1000, 1001, false );
+
+
+ ShardEntryGroup shardEntryGroup = new ShardEntryGroup( delta );
+
+ boolean result = shardEntryGroup.addShard( secondShard );
+
+ assertTrue( "Shard added", result );
+
+ result = shardEntryGroup.addShard( firstShard );
+
+ assertTrue( " Shard added", result );
+
+
+ assertFalse( "First shard cannot be deleted", shardEntryGroup.canBeDeleted( firstShard ) );
+
+ assertFalse( "Second shard cannot be deleted", shardEntryGroup.canBeDeleted( secondShard ) );
+
+ assertFalse( "Duplicate shard id cannot be deleted", shardEntryGroup.canBeDeleted( secondShard ) );
+
+ assertNull( "Can't compact, no min compacted shard present", shardEntryGroup.getCompactionTarget() );
+
+
+ //TODO, should this blow up in general? We don't have a compacted shard at the lower bounds, which shouldn't be allowed
+
+ }
+
+
+ @Test
+ public void testShardTarget() {
+
+ final long delta = 10000;
+
+ Shard compactedShard = new Shard( 0, 0, true );
+
+ Shard firstShard = new Shard( 1000, 1000, false );
+
+ Shard secondShard = new Shard( 1000, 1001, false );
+
+
+ ShardEntryGroup shardEntryGroup = new ShardEntryGroup( delta );
+
+ boolean result = shardEntryGroup.addShard( secondShard );
+
+
+ assertTrue( "Shard added", result );
+
+ result = shardEntryGroup.addShard( firstShard );
+
+ assertTrue( "Shard added", result );
+
+ result = shardEntryGroup.addShard( compactedShard );
+
+ assertTrue( " Shard added", result );
+
+
+ assertFalse( "First shard cannot be deleted", shardEntryGroup.canBeDeleted( firstShard ) );
+
+ assertFalse( "Second shard cannot be deleted", shardEntryGroup.canBeDeleted( secondShard ) );
+
+ assertFalse( "Duplicate shard id cannot be deleted", shardEntryGroup.canBeDeleted( secondShard ) );
+
+ assertEquals( "Min compaction target found", firstShard, shardEntryGroup.getCompactionTarget() );
+
+ //shouldn't return true, since we haven't passed delta time in the second shard
+ assertFalse( "Merge cannot be run within min time",
+ shardEntryGroup.shouldCompact( firstShard.getCreatedTime() + delta ) );
+
+ //shouldn't return true, since we haven't passed delta time in the second shard
+ assertFalse( "Merge cannot be run within min time",
+ shardEntryGroup.shouldCompact( secondShard.getCreatedTime() + delta ) );
+
+ //we haven't passed the delta in the neighbor that would be our source, shard2, we shouldn't return true
+ //we read from shard2 and write to shard1
+ assertFalse( "Merge cannot be run with after min time",
+ shardEntryGroup.shouldCompact( firstShard.getCreatedTime() + delta + 1 ) );
+
+ assertTrue( "Merge should be run with after min time",
+ shardEntryGroup.shouldCompact( secondShard.getCreatedTime() + delta + 1 ) );
+ }
+
+
+ @Test
+ public void multipleShardGroups() {
+
+ final long delta = 10000;
+
+ Shard firstShard = new Shard( 1000, 10000, false );
+
+ Shard secondShard = new Shard( 999, 9000, false );
+
+ Shard compactedShard1 = new Shard( 900, 8000, true );
+
+ Shard compactedShard2 = new Shard( 800, 7000, true );
+
+
+ ShardEntryGroup shardEntryGroup = new ShardEntryGroup( delta );
+
+ boolean result = shardEntryGroup.addShard( firstShard );
+
+ assertTrue( "Shard added", result );
+
+ result = shardEntryGroup.addShard( secondShard );
+
+ assertTrue( " Shard added", result );
+
+ result = shardEntryGroup.addShard( compactedShard1 );
+
+ assertTrue( "Shard added", result );
+
+ result = shardEntryGroup.addShard( compactedShard2 );
+
+ assertFalse( "Shouldn't add since it's compacted", result );
+
+ ShardEntryGroup secondGroup = new ShardEntryGroup( delta );
+
+ result = secondGroup.addShard( compactedShard2 );
+
+ assertTrue( "Added successfully", result );
+ }
+
+
+ @Test
+ public void boundShardGroup() {
+
+ final long delta = 10000;
+
+ Shard firstShard = new Shard( 1000, 10000, false );
+
+ Shard secondShard = new Shard( 999, 9000, false );
+
+ Shard compactedShard1 = new Shard( 900, 8000, true );
+
+
+ ShardEntryGroup shardEntryGroup = new ShardEntryGroup( delta );
+
+ boolean result = shardEntryGroup.addShard( firstShard );
+
+ assertTrue( "Shard added", result );
+
+ result = shardEntryGroup.addShard( secondShard );
+
+ assertTrue( " Shard added", result );
+
+ result = shardEntryGroup.addShard( compactedShard1 );
+
+ assertTrue( "Shard added", result );
+
+
+ assertTrue( "Shard can be deleted", shardEntryGroup.canBeDeleted( firstShard ) );
+
+ assertFalse( "Compaction shard shard cannot be deleted", shardEntryGroup.canBeDeleted( secondShard ) );
+
+ assertEquals( "Same shard for merge target", secondShard, shardEntryGroup.getCompactionTarget() );
+
+ //shouldn't return true, since we haven't passed delta time in the second shard
+ assertFalse( "Merge cannot be run within min time",
+ shardEntryGroup.shouldCompact( firstShard.getCreatedTime() + delta ) );
+
+ //shouldn't return true, since we haven't passed delta time in the second shard
+ assertFalse( "Merge cannot be run within min time",
+ shardEntryGroup.shouldCompact( secondShard.getCreatedTime() + delta ) );
+
+
+ assertFalse( "Merge cannot be run within min time",
+ shardEntryGroup.shouldCompact( secondShard.getCreatedTime() + delta + 1 ) );
+
+ assertTrue( "Merge should be run with after min time", shardEntryGroup.shouldCompact( firstShard.getCreatedTime() + delta + 1 ) );
+ }
+
+
+ /**
+ * Ensures that we read from all shards (even the compacted one)
+ */
+ @Test
+ public void getAllReadShards() {
+
+ final long delta = 10000;
+
+ Shard firstShard = new Shard( 1000, 10000, false );
+
+ Shard secondShard = new Shard( 999, 9000, false );
+
+ Shard compactedShard1 = new Shard( 900, 8000, true );
+
+
+ ShardEntryGroup shardEntryGroup = new ShardEntryGroup( delta );
+
+ boolean result = shardEntryGroup.addShard( firstShard );
+
+ assertTrue( "Shard added", result );
+
+ result = shardEntryGroup.addShard( secondShard );
+
+ assertTrue( " Shard added", result );
+
+ result = shardEntryGroup.addShard( compactedShard1 );
+
+ assertTrue( "Shard added", result );
+
+ Collection<Shard> readShards = shardEntryGroup.getReadShards();
+
+ assertEquals("Shard size correct", 3, readShards.size());
+
+ assertTrue("First shard present", readShards.contains( firstShard ) );
+
+ assertTrue("Second shard present", readShards.contains( firstShard ) );
+
+ assertTrue("Third shard present", readShards.contains( firstShard ) );
+
+ }
+
+
+ /**
+ * Ensures that we read from all shards (even the compacted one)
+ */
+ @Test
+ public void getAllWriteShardsNotPastCompaction() {
+
+ final long delta = 10000;
+
+ Shard firstShard = new Shard( 1000, 10000, false );
+
+ Shard secondShard = new Shard( 999, 9000, false );
+
+ Shard compactedShard = new Shard( 900, 8000, true );
+
+
+ ShardEntryGroup shardEntryGroup = new ShardEntryGroup( delta );
+
+ boolean result = shardEntryGroup.addShard( firstShard );
+
+ assertTrue( "Shard added", result );
+
+ result = shardEntryGroup.addShard( secondShard );
+
+ assertTrue( " Shard added", result );
+
+ result = shardEntryGroup.addShard( compactedShard );
+
+ assertTrue( "Shard added", result );
+
+
+
+ Collection<Shard> writeShards = shardEntryGroup.getWriteShards( firstShard.getCreatedTime() + delta );
+
+ assertEquals("Shard size correct", 3, writeShards.size());
+
+ assertTrue("First shard present", writeShards.contains( firstShard ) );
+
+ assertTrue("Second shard present", writeShards.contains( secondShard ) );
+
+ assertTrue("Third shard present", writeShards.contains( compactedShard ) );
+
+
+
+ writeShards = shardEntryGroup.getWriteShards(secondShard.getCreatedTime()+delta);
+
+ assertEquals("Shard size correct", 3, writeShards.size());
+
+ assertTrue("First shard present", writeShards.contains( firstShard ) );
+
+ assertTrue("Second shard present", writeShards.contains( secondShard ) );
+
+ assertTrue("Third shard present", writeShards.contains( compactedShard ) );
+
+
+ /**
+ * Not the max created timestamp, shouldn't return less than all shards
+ */
+ writeShards = shardEntryGroup.getWriteShards(secondShard.getCreatedTime() +1 + delta);
+
+ assertEquals("Shard size correct", 3, writeShards.size());
+
+ assertTrue("First shard present", writeShards.contains( firstShard ) );
+
+ assertTrue("Second shard present", writeShards.contains( secondShard ) );
+
+ assertTrue("Third shard present", writeShards.contains( compactedShard ) );
+
+
+
+ assertEquals("Compaction target correct", secondShard, shardEntryGroup.getCompactionTarget());
+
+ writeShards = shardEntryGroup.getWriteShards(firstShard.getCreatedTime() +1 + delta);
+
+ assertEquals("Shard size correct", 1, writeShards.size());
+
+
+ assertTrue("Second shard present", writeShards.contains( secondShard ) );
+
+ }
+
+
+ @Test(expected=IllegalArgumentException.class)
+ public void failsInsertionOrder() {
+
+ final long delta = 10000;
+
+ Shard secondShard = new Shard(20000, 10000, false);
+
+ Shard firstShard = new Shard(10000 , 10000, false );
+
+ Shard rootShard = new Shard( 0, 0, false );
+
+ ShardEntryGroup shardEntryGroup = new ShardEntryGroup( delta );
+
+ boolean result = shardEntryGroup.addShard( secondShard );
+
+ assertTrue( "Shard added", result );
+
+ result = shardEntryGroup.addShard( rootShard );
+
+ assertTrue( "Shard added", result );
+
+ //this should blow up, we can't add a shard in the middle, it must always be greater than the current max
+
+ shardEntryGroup.addShard( firstShard );
+
+
+ }
+
+
+
+ @Test
+ public void shardEntryAddList() {
+
+ final long delta = 10000;
+
+ Shard highShard = new Shard( 30000, 1000, false );
+
+ Shard midShard = new Shard( 20000, 1000, true );
+
+ Shard lowShard = new Shard( 10000, 1000, false);
+
+ ShardEntryGroup shardEntryGroup = new ShardEntryGroup( delta );
+
+ boolean result = shardEntryGroup.addShard( highShard );
+
+ assertTrue( "Shard added", result );
+
+ result = shardEntryGroup.addShard( midShard );
+
+ assertTrue( "Shard added", result );
+
+ result = shardEntryGroup.addShard( lowShard );
+
+ assertFalse( "Shard added", result );
+ }
+
+
+
+
+}
+
+
+
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9d652dd/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java
index da19ce5..fd8ff26 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java
@@ -39,12 +39,17 @@ import org.junit.Test;
import org.safehaus.guicyfig.Bypass;
import org.safehaus.guicyfig.OptionState;
import org.safehaus.guicyfig.Overrides;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
import org.apache.usergrid.persistence.core.consistency.TimeService;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.GraphFig;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardApproximation;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeType;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
@@ -68,6 +73,7 @@ import static org.mockito.Mockito.when;
public class NodeShardApproximationTest {
+ private static final Logger LOG = LoggerFactory.getLogger( NodeShardApproximation.class );
private GraphFig graphFig;
@@ -92,35 +98,38 @@ public class NodeShardApproximationTest {
when( graphFig.getShardCacheSize() ).thenReturn( 10000l );
when( graphFig.getShardSize() ).thenReturn( 250000l );
+ when( graphFig.getCounterFlushQueueSize() ).thenReturn( 10000 );
nodeShardCounterSerialization = mock( NodeShardCounterSerialization.class );
- when(nodeShardCounterSerialization.flush( any(Counter.class) )).thenReturn( mock( MutationBatch.class) );
-
+ when( nodeShardCounterSerialization.flush( any( Counter.class ) ) ).thenReturn( mock( MutationBatch.class ) );
timeService = mock( TimeService.class );
- when(timeService.getCurrentTime()).thenReturn( System.currentTimeMillis() );
+ when( timeService.getCurrentTime() ).thenReturn( System.currentTimeMillis() );
}
@Test
- public void testSingleShard() {
-
+ public void testSingleShard() throws InterruptedException {
- when(graphFig.getCounterFlushCount()).thenReturn( 100000l );
+ when( graphFig.getCounterFlushCount() ).thenReturn( 100000l );
NodeShardApproximation approximation =
new NodeShardApproximationImpl( graphFig, nodeShardCounterSerialization, timeService );
final Id id = createId( "test" );
- final long shardId = 0l;
+ final Shard shard = new Shard(0, 0, true);
final String type = "type";
final String type2 = "subType";
- long count = approximation.getCount( scope, id, shardId, type, type2 );
+ final DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromTargetNodeSourceType( id, type, type2 );
+
+ long count = approximation.getCount( scope, shard, directedEdgeMeta);
+
+ waitForFlush( approximation );
assertEquals( 0, count );
}
@@ -130,8 +139,6 @@ public class NodeShardApproximationTest {
public void testSingleShardMultipleThreads() throws ExecutionException, InterruptedException {
-
-
NodeShardCounterSerialization serialization = new TestNodeShardCounterSerialization();
final NodeShardApproximation approximation =
@@ -144,8 +151,10 @@ public class NodeShardApproximationTest {
final Id id = createId( "test" );
final String type = "type";
final String type2 = "subType";
- final long shardId = 10000;
+ final Shard shard = new Shard(10000, 0, true);
+
+ final DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromTargetNodeSourceType( id, type, type2 );
ExecutorService executor = Executors.newFixedThreadPool( workers );
@@ -158,7 +167,7 @@ public class NodeShardApproximationTest {
public Long call() throws Exception {
for ( int i = 0; i < increments; i++ ) {
- approximation.increment( scope, id, shardId, 1, type, type2 );
+ approximation.increment( scope, shard, 1, directedEdgeMeta );
}
return 0l;
@@ -169,24 +178,25 @@ public class NodeShardApproximationTest {
}
-
for ( Future<Long> future : futures ) {
- future.get();
+ future.get();
}
-
+ waitForFlush( approximation );
//get our count. It should be accurate b/c we only have 1 instance
- final long returnedCount = approximation.getCount( scope, id, shardId, type, type2);
+ final long returnedCount = approximation.getCount( scope, shard, directedEdgeMeta );
final long expected = workers * increments;
- assertEquals(expected, returnedCount);
-
+ assertEquals( expected, returnedCount );
+ //test we get nothing with the other type
+ final long emptyCount = approximation.getCount( scope, shard, DirectedEdgeMeta.fromSourceNodeTargetType( id, type, type2 ));
+ assertEquals( 0, emptyCount );
}
@@ -195,8 +205,6 @@ public class NodeShardApproximationTest {
public void testMultipleShardMultipleThreads() throws ExecutionException, InterruptedException {
-
-
NodeShardCounterSerialization serialization = new TestNodeShardCounterSerialization();
final NodeShardApproximation approximation =
@@ -210,27 +218,32 @@ public class NodeShardApproximationTest {
final String type = "type";
final String type2 = "subType";
- final AtomicLong shardIdCounter = new AtomicLong( );
+ final AtomicLong shardIdCounter = new AtomicLong();
+
+
+ final DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromTargetNodeSourceType( id, type, type2 );
ExecutorService executor = Executors.newFixedThreadPool( workers );
- List<Future<Long>> futures = new ArrayList<>( workers );
+ List<Future<Shard>> futures = new ArrayList<>( workers );
for ( int i = 0; i < workers; i++ ) {
- final Future<Long> future = executor.submit( new Callable<Long>() {
+ final Future<Shard> future = executor.submit( new Callable<Shard>() {
@Override
- public Long call() throws Exception {
+ public Shard call() throws Exception {
final long threadShardId = shardIdCounter.incrementAndGet();
+ final Shard shard = new Shard( threadShardId, 0, true );
+
for ( int i = 0; i < increments; i++ ) {
- approximation.increment( scope, id, threadShardId, 1, type, type2 );
+ approximation.increment( scope, shard, 1, directedEdgeMeta );
}
- return threadShardId;
+ return shard;
}
} );
@@ -238,29 +251,41 @@ public class NodeShardApproximationTest {
}
+ for ( Future<Shard> future : futures ) {
+ final Shard shardId = future.get();
- for ( Future<Long> future : futures ) {
- final long shardId = future.get();
+ waitForFlush( approximation );
- final long returnedCount = approximation.getCount( scope, id, shardId, type, type2);
+ final long returnedCount = approximation.getCount( scope, shardId, directedEdgeMeta);
- assertEquals(increments, returnedCount);
+ assertEquals( increments, returnedCount );
}
+ }
+
+
+ private void waitForFlush( NodeShardApproximation approximation ) throws InterruptedException {
+ approximation.beginFlush();
+ while ( approximation.flushPending() ) {
+ LOG.info("Waiting on beginFlush to complete");
+ Thread.sleep( 100 );
+ }
}
+
/**
* These are created b/c we can't use Mockito. It OOM's with keeping track of all the mock invocations
*/
- private static class TestNodeShardCounterSerialization implements NodeShardCounterSerialization{
+ private static class TestNodeShardCounterSerialization implements NodeShardCounterSerialization {
private Counter copy = new Counter();
+
@Override
public MutationBatch flush( final Counter counter ) {
copy.merge( counter );
@@ -281,7 +306,6 @@ public class NodeShardApproximationTest {
}
-
/**
* Simple test mutation to no-op during tests
*/
@@ -415,14 +439,14 @@ public class NodeShardApproximationTest {
}
-
- private static class TestGraphFig implements GraphFig{
+ private static class TestGraphFig implements GraphFig {
@Override
public int getScanPageSize() {
return 0; //To change body of implemented methods use File | Settings | File Templates.
}
+
@Override
public int getRepairConcurrentSize() {
return 0; //To change body of implemented methods use File | Settings | File Templates.
@@ -442,6 +466,12 @@ public class NodeShardApproximationTest {
@Override
+ public long getShardMinDelta() {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+
+ @Override
public long getShardCacheSize() {
return 0; //To change body of implemented methods use File | Settings | File Templates.
}
@@ -460,6 +490,12 @@ public class NodeShardApproximationTest {
@Override
+ public int getCounterFlushQueueSize() {
+ return 10000;
+ }
+
+
+ @Override
public void addPropertyChangeListener( final PropertyChangeListener propertyChangeListener ) {
//To change body of implemented methods use File | Settings | File Templates.
}
@@ -555,7 +591,8 @@ public class NodeShardApproximationTest {
}
}
- private static class TestTimeService implements TimeService{
+
+ private static class TestTimeService implements TimeService {
@Override
public long getCurrentTime() {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9d652dd/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationTest.java
index 9968f67..9edc66a 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationTest.java
@@ -35,6 +35,9 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.GraphFig;
import org.apache.usergrid.persistence.graph.guice.TestGraphModule;
import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeType;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
@@ -96,11 +99,11 @@ public class NodeShardCounterSerializationTest {
final Id id = createId( "test" );
- ShardKey key1 = new ShardKey( scope, id, 0, "type1" );
+ ShardKey key1 = new ShardKey( scope, new Shard(0, 0, false), DirectedEdgeMeta.fromSourceNode( id, "type1" ) );
- ShardKey key2 = new ShardKey( scope, id, 0, "type2" );
+ ShardKey key2 = new ShardKey( scope, new Shard(0, 0, false), DirectedEdgeMeta.fromSourceNode( id, "type2" ) );
- ShardKey key3 = new ShardKey( scope, id, 1, "type1" );
+ ShardKey key3 = new ShardKey( scope, new Shard(1, 0, false), DirectedEdgeMeta.fromSourceNode( id, "type1" ) );
Counter counter = new Counter();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9d652dd/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIteratorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIteratorTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIteratorTest.java
new file mode 100644
index 0000000..5b20647
--- /dev/null
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIteratorTest.java
@@ -0,0 +1,232 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
+
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+
+import org.junit.Test;
+
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+
+public class ShardEntryGroupIteratorTest {
+
+ @Test(expected = IllegalArgumentException.class)
+ public void noShards(){
+ final long delta = 10000;
+ final Iterator<Shard> noShards = Collections.<Shard>emptyList().iterator();
+
+ //should blow up, our iterator is empty
+ new ShardEntryGroupIterator(noShards, delta);
+
+ }
+
+ @Test
+ public void existingSingleShard(){
+
+ final Shard minShard = new Shard(0, 0, true);
+ final long delta = 10000;
+ final Iterator<Shard> noShards = Collections.singleton( minShard ).iterator();
+
+ ShardEntryGroupIterator entryGroupIterator = new ShardEntryGroupIterator(noShards, delta);
+
+ assertTrue("Root shard always present", entryGroupIterator.hasNext());
+
+ ShardEntryGroup group = entryGroupIterator.next();
+
+ assertNotNull("Group returned", group);
+
+ Collection<Shard> readShards = group.getReadShards();
+
+ assertEquals("Min shard present", 1, readShards.size());
+
+ assertTrue("Min shard present", readShards.contains( minShard ));
+
+
+ Collection<Shard> writeShards = group.getWriteShards( 0 );
+
+ assertEquals("Min shard present", 1, writeShards.size());
+
+ assertTrue("Min shard present", writeShards.contains( minShard ));
+
+
+ writeShards = group.getWriteShards( Long.MAX_VALUE );
+
+ assertEquals("Min shard present", 1, writeShards.size());
+
+ assertTrue("Min shard present", writeShards.contains( minShard ));
+
+
+ }
+
+
+ /**
+ * Tests the iterator constructs boundaries between groups correctly. In a "real" runtime environment, I expect
+ * that only the last 1 or 2 groups will actually have more than 1 entry.
+ */
+ @Test
+ public void boundedShardSets(){
+
+ /**
+ * Next shard group
+ */
+ final Shard shardGroup1Shard1 = new Shard(0, 0, true);
+
+ final Shard shardGroup1Shard2 = new Shard(10000, 100, false);
+
+ final Shard shardGroup1Shard3 = new Shard(20000, 200, false);
+
+
+ /**
+ * Middle shard group
+ */
+ final Shard shardGroup2Shard1 = new Shard(30000, 300, true);
+
+ final Shard shardGroup2Shard2 = new Shard(40000, 400, false);
+
+
+ /**
+ * Highest shard group
+ */
+
+ final Shard shardGroup3Shard1 = new Shard(50000, 500, true);
+
+ final Shard shardGroup3Shard2 = new Shard(60000, 600, false);
+
+ final Shard shardGroup3Shard3 = new Shard(70000, 700, false);
+
+
+
+
+ final long delta = 10000;
+ final Iterator<Shard> noShards = Arrays.asList(shardGroup3Shard3, shardGroup3Shard2, shardGroup3Shard1, shardGroup2Shard2, shardGroup2Shard1, shardGroup1Shard3, shardGroup1Shard2, shardGroup1Shard1 ).iterator();
+
+
+
+
+ ShardEntryGroupIterator entryGroupIterator = new ShardEntryGroupIterator(noShards, delta);
+
+ assertTrue("max group present", entryGroupIterator.hasNext());
+
+ ShardEntryGroup group = entryGroupIterator.next();
+
+ assertNotNull("Group returned", group);
+
+ Collection<Shard> readShards = group.getReadShards();
+
+ assertEquals("Min shard present", 3, readShards.size());
+
+ assertTrue("shardGroup3Shard3 shard present", readShards.contains( shardGroup3Shard3 ));
+
+ assertTrue("shardGroup3Shard2 shard present", readShards.contains( shardGroup3Shard2 ));
+
+ assertTrue("shardGroup3Shard1 shard present", readShards.contains( shardGroup3Shard1 ));
+
+
+ Collection<Shard> writeShards = group.getWriteShards( 0 );
+
+ assertEquals("Min shard present", 3, writeShards.size());
+
+ assertTrue("shardGroup3Shard3 shard present", writeShards.contains( shardGroup3Shard3 ));
+
+ assertTrue("shardGroup3Shard2 shard present", writeShards.contains( shardGroup3Shard2 ));
+
+ assertTrue("shardGroup3Shard1 shard present", writeShards.contains( shardGroup3Shard1 ));
+
+
+
+
+
+ assertTrue("middle group present", entryGroupIterator.hasNext());
+
+ group = entryGroupIterator.next();
+
+ assertNotNull("Group returned", group);
+
+ readShards = group.getReadShards();
+
+ assertEquals("Min shard present", 2, readShards.size());
+
+ assertTrue("shardGroup2Shard1 shard present", readShards.contains( shardGroup2Shard1 ));
+
+ assertTrue("shardGroup2Shard2 shard present", readShards.contains( shardGroup2Shard2 ));
+
+
+
+ writeShards = group.getWriteShards( 0 );
+
+ assertEquals("Min shard present", 2, writeShards.size());
+
+ assertTrue("shardGroup2Shard1 shard present", writeShards.contains( shardGroup2Shard1 ));
+
+ assertTrue("shardGroup2Shard2 shard present", writeShards.contains( shardGroup2Shard2 ));
+
+
+
+
+
+
+ assertTrue("min group present", entryGroupIterator.hasNext());
+
+ group = entryGroupIterator.next();
+
+ assertNotNull("Group returned", group);
+
+ readShards = group.getReadShards();
+
+ assertEquals("Min shard present", 3, readShards.size());
+
+ assertTrue("shardGroup1Shard3 shard present", readShards.contains( shardGroup1Shard3 ));
+
+ assertTrue("shardGroup1Shard2 shard present", readShards.contains( shardGroup1Shard2 ));
+
+ assertTrue("shardGroup1Shard1 shard present", readShards.contains( shardGroup1Shard1 ));
+
+
+
+ writeShards = group.getWriteShards( 0 );
+
+ assertEquals("Min shard present", 3, writeShards.size());
+
+ assertTrue("shardGroup1Shard3 shard present", writeShards.contains( shardGroup1Shard3 ));
+
+ assertTrue("shardGroup1Shard2 shard present", writeShards.contains( shardGroup1Shard2 ));
+
+ assertTrue("shardGroup1Shard1 shard present", writeShards.contains( shardGroup1Shard1 ));
+
+
+
+
+
+
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9d652dd/stack/corepersistence/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/pom.xml b/stack/corepersistence/pom.xml
index 4716533..9565543 100644
--- a/stack/corepersistence/pom.xml
+++ b/stack/corepersistence/pom.xml
@@ -55,7 +55,7 @@
<junit.version>4.11</junit.version>
<kryo-serializers.version>0.26</kryo-serializers.version>
<log4j.version>1.2.17</log4j.version>
- <rx.version>0.19.0</rx.version>
+ <rx.version>0.19.6</rx.version>
<slf4j.version>1.7.2</slf4j.version>
<surefire.version>2.16</surefire.version>