You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/08/27 02:29:18 UTC
usergrid git commit: Updated tests and logic
Repository: usergrid
Updated Branches:
refs/heads/USERGRID-909 5f7387cd1 -> 101e9f96d
Updated tests and logic
Lowered audit chance to 1%
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/101e9f96
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/101e9f96
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/101e9f96
Branch: refs/heads/USERGRID-909
Commit: 101e9f96df09000222bb47a98b06bff27dd41bac
Parents: 5f7387c
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Aug 26 18:29:15 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Aug 26 18:29:15 2015 -0600
----------------------------------------------------------------------
.../usergrid/persistence/graph/GraphFig.java | 13 +-
.../impl/shard/EdgeShardSerialization.java | 18 +-
.../impl/shard/NodeShardAllocation.java | 4 +-
.../impl/shard/ShardConsistency.java | 5 -
.../shard/impl/EdgeShardSerializationImpl.java | 26 +-
.../shard/impl/NodeShardAllocationImpl.java | 126 +++--
.../impl/shard/impl/NodeShardCacheImpl.java | 6 +-
.../impl/shard/impl/ShardConsistencyImpl.java | 6 +-
.../graph/GraphManagerShardConsistencyIT.java | 40 +-
.../impl/shard/NodeShardAllocationTest.java | 5 +-
.../impl/shard/NodeShardCacheTest.java | 4 +-
.../shard/count/NodeShardApproximationTest.java | 471 -------------------
12 files changed, 114 insertions(+), 610 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/101e9f96/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
index d0df2eb..0517bf9 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
@@ -80,7 +80,11 @@ public interface GraphFig extends GuicyFig {
int getRepairConcurrentSize();
- @Default( ".10" )
+ /**
+ * A 1% repair chance. On average we'll check to repair on 1 out of every 100 reads
+ * @return
+ */
+ @Default( ".01" )
@Key( SHARD_REPAIR_CHANCE )
double getShardRepairChance();
@@ -120,12 +124,5 @@ public interface GraphFig extends GuicyFig {
@Default( "CL_LOCAL_QUORUM" )
@Key( SHARD_READ_CONSISTENCY )
String getShardReadConsistency();
-
- /**
- * Get the consistency level for performing a shard audit
- */
- @Default( "CL_EACH_QUORUM" )
- @Key( SHARD_AUDIT_CONSISTENCY )
- String getShardAuditConsistency();
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/101e9f96/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerialization.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerialization.java
index d8c561f..63b7594 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerialization.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerialization.java
@@ -40,7 +40,7 @@ public interface EdgeShardSerialization extends Migration{
* @param shard The shard to write
* @param directedEdgeMeta The edge meta data to use
*/
- public MutationBatch writeShardMeta( ApplicationScope scope, Shard shard, DirectedEdgeMeta directedEdgeMeta );
+ MutationBatch writeShardMeta( ApplicationScope scope, Shard shard, DirectedEdgeMeta directedEdgeMeta );
/**
* Get an iterator of all meta data and types. Returns a range from High to low. Only reads the local region
@@ -49,20 +49,10 @@ public interface EdgeShardSerialization extends Migration{
* @param directedEdgeMeta The edge meta data to use
* @return
*/
- public Iterator<Shard> getShardMetaDataLocal( ApplicationScope scope, Optional<Shard> start,
- DirectedEdgeMeta directedEdgeMeta );
-
-
- /**
- * Get an iterator of all meta data and types. Returns a range from High to low. Reads quorum of all regions
- * @param scope The organization scope
- * @param start The shard time to start seeking from. Values <= this value will be returned.
- * @param directedEdgeMeta The edge meta data to use
- * @return
- */
- Iterator<Shard> getShardMetaDataAudit( ApplicationScope scope, Optional<Shard> start,
+ Iterator<Shard> getShardMetaDataLocal( ApplicationScope scope, Optional<Shard> start,
DirectedEdgeMeta directedEdgeMeta );
+
/**
* Remove the shard from the edge meta data from the types.
@@ -71,6 +61,6 @@ public interface EdgeShardSerialization extends Migration{
* @param directedEdgeMeta The edge meta data to use
* @return
*/
- public MutationBatch removeShardMeta( ApplicationScope scope, Shard shard, DirectedEdgeMeta directedEdgeMeta );
+ MutationBatch removeShardMeta( ApplicationScope scope, Shard shard, DirectedEdgeMeta directedEdgeMeta );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/101e9f96/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocation.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocation.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocation.java
index 5039b35..e7be0b0 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocation.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocation.java
@@ -42,8 +42,8 @@ public interface NodeShardAllocation {
* @param directedEdgeMeta The directed edge metadata to use
* @return A list of all shards <= the current shard. This will always return 0l if no shards are allocated
*/
- Iterator<ShardEntryGroup> getShards( final ApplicationScope scope, Optional<Shard> maxShardId,
- final DirectedEdgeMeta directedEdgeMeta );
+ Iterator<ShardEntryGroup> getShardsLocal( final ApplicationScope scope, Optional<Shard> maxShardId,
+ final DirectedEdgeMeta directedEdgeMeta );
/**
http://git-wip-us.apache.org/repos/asf/usergrid/blob/101e9f96/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardConsistency.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardConsistency.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardConsistency.java
index 5c52af6..6f992eb 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardConsistency.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardConsistency.java
@@ -38,9 +38,4 @@ public interface ShardConsistency {
*/
ConsistencyLevel getShardReadConsistency();
- /**
- * Get the consistency level for performing a shard audit
- * @return
- */
- ConsistencyLevel getShardAuditConsistency();
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/101e9f96/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
index 8ccf809..d6cd998 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
@@ -116,28 +116,6 @@ public class EdgeShardSerializationImpl implements EdgeShardSerialization {
public Iterator<Shard> getShardMetaDataLocal( final ApplicationScope scope, final Optional<Shard> start,
final DirectedEdgeMeta metaData ) {
- return getShardMetaDataInternal( scope, start, metaData, shardConsistency.getShardReadConsistency() );
- }
-
-
- @Override
- public Iterator<Shard> getShardMetaDataAudit( final ApplicationScope scope, final Optional<Shard> start,
- final DirectedEdgeMeta directedEdgeMeta ) {
- return getShardMetaDataInternal( scope, start, directedEdgeMeta, shardConsistency.getShardAuditConsistency() );
- }
-
-
- /**
- * Get the shard meta data, allowing the caller to specify the consistency level
- * @param scope
- * @param start
- * @param metaData
- * @param consistencyLevel
- * @return
- */
- private Iterator<Shard> getShardMetaDataInternal( final ApplicationScope scope, final Optional<Shard> start,
- final DirectedEdgeMeta metaData,
- final ConsistencyLevel consistencyLevel ) {
ValidationUtils.validateApplicationScope( scope );
GraphValidation.validateDirectedEdgeMeta( metaData );
@@ -162,7 +140,7 @@ public class EdgeShardSerializationImpl implements EdgeShardSerialization {
final RowQuery<ScopedRowKey<DirectedEdgeMeta>, Long> query =
- keyspace.prepareQuery( EDGE_SHARDS ).setConsistencyLevel( consistencyLevel ).getKey( rowKey )
+ keyspace.prepareQuery( EDGE_SHARDS ).setConsistencyLevel( shardConsistency.getShardReadConsistency() ).getKey( rowKey )
.autoPaginate( true ).withColumnRange( rangeBuilder.build() );
@@ -170,6 +148,8 @@ public class EdgeShardSerializationImpl implements EdgeShardSerialization {
}
+
+
@Override
public MutationBatch removeShardMeta( final ApplicationScope scope, final Shard shard,
final DirectedEdgeMeta metaData ) {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/101e9f96/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
index 592d308..eced1a8 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
@@ -59,7 +59,7 @@ import com.netflix.astyanax.util.TimeUUIDUtils;
public class NodeShardAllocationImpl implements NodeShardAllocation {
- private static final Logger LOG = LoggerFactory.getLogger( NodeShardAllocationImpl.class );
+ private static final Logger logger = LoggerFactory.getLogger( NodeShardAllocationImpl.class );
private static final Shard MIN_SHARD = new Shard( 0, 0, true );
@@ -91,8 +91,8 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
@Override
- public Iterator<ShardEntryGroup> getShards( final ApplicationScope scope, final Optional<Shard> maxShardId,
- final DirectedEdgeMeta directedEdgeMeta ) {
+ public Iterator<ShardEntryGroup> getShardsLocal( final ApplicationScope scope, final Optional<Shard> maxShardId,
+ final DirectedEdgeMeta directedEdgeMeta ) {
ValidationUtils.validateApplicationScope( scope );
Preconditions.checkNotNull( maxShardId, "maxShardId cannot be null" );
@@ -128,20 +128,30 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
@Override
- public boolean auditShard( final ApplicationScope scope, final ShardEntryGroup shardEntryGroup,
+ public boolean auditShard( final ApplicationScope scope, final ShardEntryGroup lastLoadedShardEntryGroup,
final DirectedEdgeMeta directedEdgeMeta ) {
ValidationUtils.validateApplicationScope( scope );
- GraphValidation.validateShardEntryGroup( shardEntryGroup );
+ GraphValidation.validateShardEntryGroup( lastLoadedShardEntryGroup );
GraphValidation.validateDirectedEdgeMeta( directedEdgeMeta );
- Preconditions.checkNotNull( shardEntryGroup, "shardEntryGroup cannot be null" );
+ Preconditions.checkNotNull( lastLoadedShardEntryGroup, "lastLoadedShardEntryGroup cannot be null" );
//we have to read our state from cassandra first to ensure we have an up to date view from other regions
+ //read our shard entry groups to ensure we have a current state
+ final Iterator<ShardEntryGroup> shardEntryGroupIterator =
+ getCurrentStateIterator( scope, lastLoadedShardEntryGroup, directedEdgeMeta );
+ if ( !shardEntryGroupIterator.hasNext() ) {
+ logger.warn( "Could not read our shard entries. Our state is unknown, short circuiting" );
+ return false;
+ }
+
+ final ShardEntryGroup shardEntryGroup = shardEntryGroupIterator.next();
+
/**
* Nothing to do, it's been created very recently, we don't create a new one
*/
@@ -155,37 +165,37 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
}
- /**
- * Check the min shard in our system
- */
- final Shard shard = shardEntryGroup.getMinShard();
+ final Shard minShard = shardEntryGroup.getMinShard();
/**
* Check out if we have a count for our shard allocation
*/
- final long count = nodeShardApproximation.getCount( scope, shard, directedEdgeMeta );
+ final long count = nodeShardApproximation.getCount( scope, minShard, directedEdgeMeta );
final long shardSize = graphFig.getShardSize();
if ( count < shardSize ) {
+ logger.debug( "Our count is less than our shard size, not allocating a new shard" );
return false;
}
- if ( LOG.isDebugEnabled() ) {
- LOG.debug( "Count of {} has exceeded shard config of {} will begin compacting", count, shardSize );
- }
+
+ logger.debug( "Count of {} has exceeded shard config of {} will begin compacting", count, shardSize );
/**
- * We want to allocate a new shard as close to the max value as possible. This way if we're filling up a shard rapidly, we split it near the head of the values.
- * Further checks to this group will result in more splits, similar to creating a tree type structure and splitting each node.
+ * We want to allocate a new shard as close to the max value as possible. This way if we're filling up a
+ * shard rapidly, we split it near the head of the values.
*
- * This means that the lower shard can be re-split later if it is still too large. We do the division to truncate
- * to a split point < what our current max is that would be approximately be our pivot ultimately if we split from the
- * lower bound and moved forward. Doing this will stop the current shard from expanding and avoid a point where we cannot
- * ultimately compact to the correct shard size.
+ * Further checks to this group will result in more splits, similar to creating a tree type structure and
+ * splitting each node.
+ *
+ * This means that the lower shard can be re-split later if it is still too large. We do the division to
+ * truncate to a split point < what our current max is that would be approximately be our pivot ultimately
+ * if we split from the lower bound and moved forward. Doing this will stop the current shard from expanding
+ * and avoid a point where we cannot ultimately compact to the correct shard size due to excessive tombstones.
*/
@@ -199,7 +209,8 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
if ( !edges.hasNext() ) {
- LOG.warn( "Tried to allocate a new shard for edge meta data {}, " + "but no max value could be found in that row",
+ logger.warn(
+ "Tried to allocate a new shard for edge meta data {}, " + "but no max value could be found in that row",
directedEdgeMeta );
return false;
}
@@ -230,7 +241,7 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
* Sanity check in case our counters become severely out of sync with our edge state in cassandra.
*/
if ( marked == null ) {
- LOG.warn( "Incorrect shard count for shard group {}, ignoring", shardEntryGroup );
+ logger.warn( "Incorrect shard count for shard group {}, ignoring", shardEntryGroup );
return false;
}
@@ -238,7 +249,7 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
final Shard newShard = new Shard( marked.getTimestamp(), createTimestamp, false );
- LOG.info( "Allocating new shard {} for edge meta {}", newShard, directedEdgeMeta );
+ logger.info( "Allocating new shard {} for edge meta {}", newShard, directedEdgeMeta );
final MutationBatch batch = this.edgeShardSerialization.writeShardMeta( scope, newShard, directedEdgeMeta );
@@ -252,50 +263,69 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
return true;
}
+
/**
- * Return true if the node has been created within our timeout. If this is the case, we dont' need to check
- * cassandra, we know it won't exist
- */
- private boolean isNewNode( DirectedEdgeMeta directedEdgeMeta ) {
+ * Return true if the node has been created within our timeout. If this is the case, we dont' need to check
+ * cassandra, we know it won't exist
+ */
+ private boolean isNewNode( DirectedEdgeMeta directedEdgeMeta ) {
- //TODO: TN this is broken....
- //The timeout is in milliseconds. Time for a time uuid is 1/10000 of a milli, so we need to get the units correct
- final long timeNow = timeService.getCurrentTime();
+ //TODO: TN this is broken....
+ //The timeout is in milliseconds. Time for a time uuid is 1/10000 of a milli, so we need to get the units
+ // correct
+ final long timeNow = timeService.getCurrentTime();
- boolean isNew = true;
+ boolean isNew = true;
- for ( DirectedEdgeMeta.NodeMeta node : directedEdgeMeta.getNodes() ) {
+ for ( DirectedEdgeMeta.NodeMeta node : directedEdgeMeta.getNodes() ) {
- //short circuit
- if(!isNew || node.getId().getUuid().version() > 2){
- return false;
- }
+ //short circuit
+ if ( !isNew || node.getId().getUuid().version() > 2 ) {
+ return false;
+ }
- final long uuidTime = TimeUUIDUtils.getTimeFromUUID( node.getId().getUuid());
+ final long uuidTime = TimeUUIDUtils.getTimeFromUUID( node.getId().getUuid() );
- //take our uuid time and add 10 seconds, if the uuid is within 10 seconds of system time, we can consider it "new"
- final long newExpirationTimeout = uuidTime + 10000 ;
+ //take our uuid time and add 10 seconds, if the uuid is within 10 seconds of system time, we can consider
+ // it "new"
+ final long newExpirationTimeout = uuidTime + 1000;
- //our expiration is after our current time, treat it as new
- isNew = isNew && newExpirationTimeout > timeNow;
- }
+ //our expiration is after our current time, treat it as new
+ isNew = isNew && newExpirationTimeout > timeNow;
+ }
- return isNew;
- }
+ return isNew;
+ }
- private ShardEntryGroupIterator getCurrentStateIterator(final ApplicationScope scope, final ShardEntryGroup shardEntryGroup,
- final DirectedEdgeMeta directedEdgeMeta ){
+
+ /**
+ * Re-reads our shard groups using our to ensure we get a consistent view of all shards
+ */
+ private Iterator<ShardEntryGroup> getCurrentStateIterator( final ApplicationScope scope,
+ final ShardEntryGroup shardEntryGroup,
+ final DirectedEdgeMeta directedEdgeMeta ) {
final Shard start = shardEntryGroup.getMaxShard();
- final Iterator<Shard> shards = this.edgeShardSerialization.getShardMetaDataAudit( scope, Optional.fromNullable( start ), directedEdgeMeta );
+ //sanity check
+ if ( start == null ) {
+ logger.warn( "Could not audit shard group {}. Returning.", shardEntryGroup );
+ return Collections.<ShardEntryGroup>emptyList().iterator();
+ }
+
+ final Iterator<Shard> shards = this.edgeShardSerialization
+ .getShardMetaDataLocal( scope, Optional.fromNullable( start ), directedEdgeMeta );
+
+ if(!shards.hasNext()){
+ return Collections.<ShardEntryGroup>emptyList().iterator();
+ }
return new ShardEntryGroupIterator( shards, NO_OP_COMPACTION, scope, directedEdgeMeta );
}
- private final static class NoOpCompaction implements ShardGroupCompaction{
+ private final static class NoOpCompaction implements ShardGroupCompaction {
@Override
public ListenableFuture<AuditResult> evaluateShardGroup( final ApplicationScope scope,
http://git-wip-us.apache.org/repos/asf/usergrid/blob/101e9f96/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
index 80375a8..ffd39bf 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
@@ -24,12 +24,8 @@ import java.util.Iterator;
import java.util.Map;
import java.util.TreeMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.util.ValidationUtils;
-import org.apache.usergrid.persistence.graph.GraphFig;
import org.apache.usergrid.persistence.graph.exception.GraphRuntimeException;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardAllocation;
@@ -160,7 +156,7 @@ public class NodeShardCacheImpl implements NodeShardCache {
private CacheEntry getShards( final CacheKey key ) {
final Iterator<ShardEntryGroup> edges =
- nodeShardAllocation.getShards( key.scope, Optional.<Shard>absent(), key.directedEdgeMeta );
+ nodeShardAllocation.getShardsLocal( key.scope, Optional.<Shard>absent(), key.directedEdgeMeta );
final CacheEntry cacheEntry = new CacheEntry( edges );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/101e9f96/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardConsistencyImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardConsistencyImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardConsistencyImpl.java
index c76fae6..a52ec0c 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardConsistencyImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardConsistencyImpl.java
@@ -47,12 +47,8 @@ public class ShardConsistencyImpl implements ShardConsistency{
@Override
public ConsistencyLevel getShardReadConsistency() {
- return null;
+ return ConsistencyLevel.valueOf( graphFig.getShardReadConsistency() );
}
- @Override
- public ConsistencyLevel getShardAuditConsistency() {
- return null;
- }
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/101e9f96/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
index fdb0952..b1ac52e 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
@@ -98,10 +98,6 @@ public class GraphManagerShardConsistencyIT {
protected Object originalShardSize;
- protected Object originalShardTimeout;
-
- protected Object originalShardDelta;
-
@Before
public void setupOrg() {
@@ -158,8 +154,8 @@ public class GraphManagerShardConsistencyIT {
};
-// final int numInjectors = 2;
- final int numInjectors = 1;
+ final int numInjectors = 2;
+// final int numInjectors = 1;
/**
* create 3 injectors. This way all the caches are independent of one another. This is the same as
@@ -173,11 +169,8 @@ public class GraphManagerShardConsistencyIT {
final long shardSize = graphFig.getShardSize();
- //we don't want to starve the cass runtime since it will be on the same box. Only take 50% of processing
- // power for writes
- final int numProcessors = Runtime.getRuntime().availableProcessors() / 2;
- final int numWorkersPerInjector = numProcessors / numInjectors;
+ final int numWorkersPerInjector = 1;
/**
@@ -200,9 +193,6 @@ public class GraphManagerShardConsistencyIT {
final AtomicLong writeCounter = new AtomicLong();
- //min stop time the min delta + 1 cache cycle timeout
- final long minExecutionTime = 10000;
-
log.info( "Writing {} edges per worker on {} workers in {} injectors", workerWriteLimit, numWorkersPerInjector,
numInjectors );
@@ -212,13 +202,15 @@ public class GraphManagerShardConsistencyIT {
+ //create multiple instances of injectors. This simulates multiple nodes, so that we can ensure we're not
+ //sharing state in our guice DI
for ( Injector injector : injectors ) {
final GraphManagerFactory gmf = injector.getInstance( GraphManagerFactory.class );
for ( int i = 0; i < numWorkersPerInjector; i++ ) {
Future<Boolean> future = executor
- .submit( new Worker( gmf, generator, workerWriteLimit, minExecutionTime, writeCounter ) );
+ .submit( new Worker( gmf, generator, workerWriteLimit, writeCounter ) );
futures.add( future );
}
@@ -328,7 +320,9 @@ public class GraphManagerShardConsistencyIT {
//we're done
if ( compactedCount >= expectedShardCount ) {
- log.info( "All compactions complete, sleeping" );
+ log.info( "All compactions complete. Compacted shards are {}. Expected at least expectedShardCount" );
+
+ assertEquals("Compacted should match expected", expectedShardCount, compactedCount);
// final Object mutex = new Object();
//
@@ -385,16 +379,13 @@ public class GraphManagerShardConsistencyIT {
private final GraphManagerFactory factory;
private final EdgeGenerator generator;
private final long writeLimit;
- private final long minExecutionTime;
private final AtomicLong writeCounter;
- private Worker( final GraphManagerFactory factory, final EdgeGenerator generator, final long writeLimit,
- final long minExecutionTime, final AtomicLong writeCounter ) {
+ private Worker( final GraphManagerFactory factory, final EdgeGenerator generator, final long writeLimit, final AtomicLong writeCounter ) {
this.factory = factory;
this.generator = generator;
this.writeLimit = writeLimit;
- this.minExecutionTime = minExecutionTime;
this.writeCounter = writeCounter;
}
@@ -404,10 +395,9 @@ public class GraphManagerShardConsistencyIT {
GraphManager manager = factory.createEdgeManager( scope );
- final long startTime = System.currentTimeMillis();
-
+ long i;
- for ( long i = 0; i < writeLimit || System.currentTimeMillis() - startTime < minExecutionTime; i++ ) {
+ for ( i = 0; i < writeLimit ; i++ ) {
Edge edge = generator.newEdge();
@@ -428,6 +418,8 @@ public class GraphManagerShardConsistencyIT {
}
}
+ log.info( "Completed writing {} edges on worker", i );
+
return true;
}
@@ -528,12 +520,12 @@ public class GraphManagerShardConsistencyIT {
/**
* Create a new edge to persiste
*/
- public Edge newEdge();
+ Edge newEdge();
/**
* Perform the search returning an observable edge
*/
- public Observable<Edge> doSearch( final GraphManager manager );
+ Observable<Edge> doSearch( final GraphManager manager );
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/101e9f96/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
index dae99b1..0fc2eb1 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
@@ -36,7 +36,6 @@ import org.apache.usergrid.persistence.core.util.IdGenerator;
import org.apache.usergrid.persistence.graph.GraphFig;
import org.apache.usergrid.persistence.graph.MarkedEdge;
import org.apache.usergrid.persistence.graph.SearchByIdType;
-import org.apache.usergrid.persistence.graph.exception.GraphRuntimeException;
import org.apache.usergrid.persistence.graph.impl.SimpleMarkedEdge;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.NodeShardAllocationImpl;
import org.apache.usergrid.persistence.model.entity.Id;
@@ -526,7 +525,7 @@ public class NodeShardAllocationTest {
final Iterator<ShardEntryGroup> result =
- approximation.getShards( scope, Optional.<Shard>absent(), directedEdgeMeta );
+ approximation.getShardsLocal( scope, Optional.<Shard>absent(), directedEdgeMeta );
assertTrue( "Shards present", result.hasNext() );
@@ -616,7 +615,7 @@ public class NodeShardAllocationTest {
final Iterator<ShardEntryGroup> result =
- approximation.getShards( scope, Optional.<Shard>absent(), directedEdgeMeta );
+ approximation.getShardsLocal( scope, Optional.<Shard>absent(), directedEdgeMeta );
ShardEntryGroup shardEntryGroup = result.next();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/101e9f96/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCacheTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCacheTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCacheTest.java
index 57996ec..96feb26 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCacheTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCacheTest.java
@@ -102,7 +102,7 @@ public class NodeShardCacheTest {
/**
* Simulate returning no shards at all.
*/
- when( allocation.getShards( same( scope ), same( max ), same( directedEdgeMeta ) ) )
+ when( allocation.getShardsLocal( same( scope ), same( max ), same( directedEdgeMeta ) ) )
//use "thenAnswer" so we always return the value, even if it's invoked more than 1 time.
.thenAnswer( new Answer<Iterator<ShardEntryGroup>>() {
@@ -191,7 +191,7 @@ public class NodeShardCacheTest {
/**
* Simulate returning no shards at all.
*/
- when( allocation.getShards( same( scope ), any( Optional.class ), same( directedEdgeMeta ) ) )
+ when( allocation.getShardsLocal( same( scope ), any( Optional.class ), same( directedEdgeMeta ) ) )
//use "thenAnswer" so we always return the value, even if it's invoked more than 1 time.
.thenAnswer( new Answer<Iterator<ShardEntryGroup>>() {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/101e9f96/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java
index 71947f8..bcaf637 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java
@@ -135,135 +135,6 @@ public class NodeShardApproximationTest {
}
- @Ignore("outdated and no longer relevant test")
- @Test
- public void testSingleShardMultipleThreads() throws ExecutionException, InterruptedException {
-
-
- NodeShardCounterSerialization serialization = new TestNodeShardCounterSerialization();
-
- final NodeShardApproximation approximation =
- new NodeShardApproximationImpl( new TestGraphFig(), serialization, new TestTimeService() );
-
-
- final int increments = 1000000;
- final int workers = Runtime.getRuntime().availableProcessors() * 2;
-
- final Id id = IdGenerator.createId( "test" );
- final String type = "type";
- final String type2 = "subType";
-
- final Shard shard = new Shard( 10000, 0, true );
-
- final DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromTargetNodeSourceType( id, type, type2 );
-
- ExecutorService executor = Executors.newFixedThreadPool( workers );
-
- List<Future<Long>> futures = new ArrayList<>( workers );
-
- for ( int i = 0; i < workers; i++ ) {
-
- final Future<Long> future = executor.submit( new Callable<Long>() {
- @Override
- public Long call() throws Exception {
-
- for ( int i = 0; i < increments; i++ ) {
- approximation.increment( scope, shard, 1, directedEdgeMeta );
- }
-
- return 0l;
- }
- } );
-
- futures.add( future );
- }
-
-
- for ( Future<Long> future : futures ) {
- future.get();
- }
-
- waitForFlush( approximation );
- //get our count. It should be accurate b/c we only have 1 instance
-
- final long returnedCount = approximation.getCount( scope, shard, directedEdgeMeta );
- final long expected = workers * increments;
-
-
- assertEquals( expected, returnedCount );
-
- //test we get nothing with the other type
-
- final long emptyCount =
- approximation.getCount( scope, shard, DirectedEdgeMeta.fromSourceNodeTargetType( id, type, type2 ) );
-
-
- assertEquals( 0, emptyCount );
- }
-
-
- @Ignore("outdated and no longer relevant test")
- @Test
- public void testMultipleShardMultipleThreads() throws ExecutionException, InterruptedException {
-
-
- NodeShardCounterSerialization serialization = new TestNodeShardCounterSerialization();
-
- final NodeShardApproximation approximation =
- new NodeShardApproximationImpl( new TestGraphFig(), serialization, new TestTimeService() );
-
-
- final int increments = 1000000;
- final int workers = Runtime.getRuntime().availableProcessors() * 2;
-
- final Id id = IdGenerator.createId( "test" );
- final String type = "type";
- final String type2 = "subType";
-
- final AtomicLong shardIdCounter = new AtomicLong();
-
-
- final DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromTargetNodeSourceType( id, type, type2 );
-
-
- ExecutorService executor = Executors.newFixedThreadPool( workers );
-
- List<Future<Shard>> futures = new ArrayList<>( workers );
-
- for ( int i = 0; i < workers; i++ ) {
-
- final Future<Shard> future = executor.submit( new Callable<Shard>() {
- @Override
- public Shard call() throws Exception {
-
- final long threadShardId = shardIdCounter.incrementAndGet();
-
- final Shard shard = new Shard( threadShardId, 0, true );
-
- for ( int i = 0; i < increments; i++ ) {
- approximation.increment( scope, shard, 1, directedEdgeMeta );
- }
-
- return shard;
- }
- } );
-
- futures.add( future );
- }
-
-
- for ( Future<Shard> future : futures ) {
- final Shard shardId = future.get();
-
- waitForFlush( approximation );
-
- final long returnedCount = approximation.getCount( scope, shardId, directedEdgeMeta );
-
- assertEquals( increments, returnedCount );
- }
- }
-
-
private void waitForFlush( NodeShardApproximation approximation ) throws InterruptedException {
approximation.beginFlush();
@@ -277,346 +148,4 @@ public class NodeShardApproximationTest {
}
- /**
- * These are created b/c we can't use Mockito. It OOM's with keeping track of all the mock invocations
- */
-
- private static class TestNodeShardCounterSerialization implements NodeShardCounterSerialization {
-
- private Counter copy = new Counter();
-
-
- @Override
- public MutationBatch flush( final Counter counter ) {
- copy.merge( counter );
- return new TestMutationBatch();
- }
-
-
- @Override
- public long getCount( final ShardKey key ) {
- return copy.get( key );
- }
-
-
- @Override
- public Collection<MultiTennantColumnFamilyDefinition> getColumnFamilies() {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
- }
-
-
- /**
- * Simple test mutation to no-op during tests
- */
- private
- static class TestMutationBatch implements MutationBatch {
-
- @Override
- public <K, C> ColumnListMutation<C> withRow( final ColumnFamily<K, C> columnFamily, final K rowKey ) {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public <K> void deleteRow( final Iterable<? extends ColumnFamily<K, ?>> columnFamilies, final K rowKey ) {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public void discardMutations() {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public void mergeShallow( final MutationBatch other ) {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public boolean isEmpty() {
- return false; //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public int getRowCount() {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public Map<ByteBuffer, Set<String>> getRowKeys() {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public MutationBatch pinToHost( final Host host ) {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public MutationBatch setConsistencyLevel( final ConsistencyLevel consistencyLevel ) {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public MutationBatch withConsistencyLevel( final ConsistencyLevel consistencyLevel ) {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public MutationBatch withRetryPolicy( final RetryPolicy retry ) {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public MutationBatch usingWriteAheadLog( final WriteAheadLog manager ) {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public MutationBatch lockCurrentTimestamp() {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public MutationBatch setTimeout( final long timeout ) {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public MutationBatch setTimestamp( final long timestamp ) {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public MutationBatch withTimestamp( final long timestamp ) {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public MutationBatch withAtomicBatch( final boolean condition ) {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public ByteBuffer serialize() throws Exception {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public void deserialize( final ByteBuffer data ) throws Exception {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public OperationResult<Void> execute() throws ConnectionException {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public ListenableFuture<OperationResult<Void>> executeAsync() throws ConnectionException {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
- }
-
-
- private static class TestGraphFig implements GraphFig {
-
- @Override
- public int getScanPageSize() {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public int getRepairConcurrentSize() {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public double getShardRepairChance() {
- return 0;
- }
-
-
- @Override
- public long getShardSize() {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
-
-
- @Override
- public int getShardAuditWorkerCount() {
- return 0;
- }
-
-
- @Override
- public int getShardAuditWorkerQueueSize() {
- return 0;
- }
-
-
- @Override
- public long getCounterFlushCount() {
- return 100000l;
- }
-
-
- @Override
- public long getCounterFlushInterval() {
- return 30000l;
- }
-
-
- @Override
- public int getCounterFlushQueueSize() {
- return 10000;
- }
-
-
- @Override
- public String getShardWriteConsistency() {
- return null;
- }
-
-
- @Override
- public String getShardReadConsistency() {
- return null;
- }
-
-
- @Override
- public String getShardAuditConsistency() {
- return null;
- }
-
-
- @Override
- public void addPropertyChangeListener( final PropertyChangeListener propertyChangeListener ) {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public void removePropertyChangeListener( final PropertyChangeListener propertyChangeListener ) {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public OptionState[] getOptions() {
- return new OptionState[0]; //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public OptionState getOption( final String s ) {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public String getKeyByMethod( final String s ) {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public Object getValueByMethod( final String s ) {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public Properties filterOptions( final Properties properties ) {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public Map<String, Object> filterOptions( final Map<String, Object> stringObjectMap ) {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public void override( final String s, final String s2 ) {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public boolean setOverrides( final Overrides overrides ) {
- return false; //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public Overrides getOverrides() {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public void bypass( final String s, final String s2 ) {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public boolean setBypass( final Bypass bypass ) {
- return false; //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public Bypass getBypass() {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public Class getFigInterface() {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public boolean isSingleton() {
- return false; //To change body of implemented methods use File | Settings | File Templates.
- }
- }
-
-
- private static class TestTimeService implements TimeService {
-
- @Override
- public long getCurrentTime() {
- return System.currentTimeMillis();
- }
- }
}