You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/11/03 21:47:59 UTC
usergrid git commit: Adds delta to avoid accidentally deleting a
shard as it's allocated
Repository: usergrid
Updated Branches:
refs/heads/USERGRID-909 817d7ffb4 -> 06d34dbce
Adds delta to avoid accidentally deleting a shard as it's allocated
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/06d34dbc
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/06d34dbc
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/06d34dbc
Branch: refs/heads/USERGRID-909
Commit: 06d34dbce75b2868649aea3b44c651ecd3799d30
Parents: 817d7ff
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Nov 3 13:47:52 2015 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Nov 3 13:47:52 2015 -0700
----------------------------------------------------------------------
.../impl/ScopedCacheSerializationImpl.java | 29 ++++--------
.../usergrid/persistence/graph/GraphFig.java | 20 ++++++--
.../persistence/graph/guice/GraphModule.java | 2 +
.../impl/shard/ShardEntryGroup.java | 18 +++++--
.../shard/impl/NodeShardAllocationImpl.java | 49 +++++++++-----------
.../shard/impl/ShardEntryGroupIterator.java | 6 ++-
.../graph/GraphManagerShardConsistencyIT.java | 37 ++++++---------
.../impl/shard/NodeShardAllocationTest.java | 12 +++--
.../impl/shard/NodeShardGroupSearchTest.java | 14 +++---
.../impl/shard/ShardEntryGroupTest.java | 47 +++++++------------
.../shard/impl/ShardEntryGroupIteratorTest.java | 6 +--
stack/corepersistence/locks/pom.xml | 8 ++--
12 files changed, 121 insertions(+), 127 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/06d34dbc/stack/corepersistence/cache/src/main/java/org/apache/usergrid/persistence/cache/impl/ScopedCacheSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/cache/src/main/java/org/apache/usergrid/persistence/cache/impl/ScopedCacheSerializationImpl.java b/stack/corepersistence/cache/src/main/java/org/apache/usergrid/persistence/cache/impl/ScopedCacheSerializationImpl.java
index 1439bc5..bed84c6 100644
--- a/stack/corepersistence/cache/src/main/java/org/apache/usergrid/persistence/cache/impl/ScopedCacheSerializationImpl.java
+++ b/stack/corepersistence/cache/src/main/java/org/apache/usergrid/persistence/cache/impl/ScopedCacheSerializationImpl.java
@@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.hash.Funnel;
import com.google.common.hash.PrimitiveSink;
@@ -79,13 +80,15 @@ public class ScopedCacheSerializationImpl<K,V> implements ScopedCacheSerializati
private static final int[] NUM_BUCKETS = {20};
/** How to funnel keys for buckets */
- private static final Funnel<String> MAP_KEY_FUNNEL = new Funnel<String>() {
+ private static final Funnel<String> MAP_KEY_FUNNEL =
+ ( Funnel<String> ) ( key, into ) -> into.putString(key, StringHashUtils.UTF8 );
+
+
+ /**
+ * One second gc grace since our columns expire
+ */
+ private static final int GC_GRACE = 1;
- @Override
- public void funnel( final String key, final PrimitiveSink into ) {
- into.putString(key, StringHashUtils.UTF8);
- }
- };
/**
* Locator to get us all buckets
@@ -252,18 +255,6 @@ public class ScopedCacheSerializationImpl<K,V> implements ScopedCacheSerializati
}
- private class MutationBatchExec implements Callable<Void> {
- private final MutationBatch myBatch;
- private MutationBatchExec(MutationBatch batch) {
- myBatch = batch;
- }
- @Override
- public Void call() throws Exception {
- myBatch.execute();
- return null;
- }
- }
-
private OperationResult<Void> executeBatch(MutationBatch batch) {
try {
@@ -284,7 +275,7 @@ public class ScopedCacheSerializationImpl<K,V> implements ScopedCacheSerializati
BytesType.class.getSimpleName(),
BytesType.class.getSimpleName(),
BytesType.class.getSimpleName(),
- MultiTennantColumnFamilyDefinition.CacheOption.KEYS );
+ MultiTennantColumnFamilyDefinition.CacheOption.KEYS, Optional.of(GC_GRACE) );
return Arrays.asList(scopedCache);
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/06d34dbc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
index 82dfe51..b209835 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
@@ -43,7 +43,6 @@ public interface GraphFig extends GuicyFig {
*
* You will want to set this value to no more than 2x tombstone_failure_threshold to avoid failures on read during
* shard compaction.
- *
*/
String SHARD_SIZE = "usergrid.graph.shard.size";
@@ -68,6 +67,14 @@ public interface GraphFig extends GuicyFig {
String COUNTER_WRITE_FLUSH_QUEUE_SIZE = "usergrid.graph.shard.counter.queue.size";
+ /**
+ * The minimum amount of time than can occur (in millis) between shard allocation and deletion.
+ *
+ * Note that you should also pad this for node clock drift. A good value for this would be 60 seconds, assuming you
+ * have NTP and your nodes are reasonably (< 1 second) synced
+ */
+ String SHARD_DELETE_DELTA = "usergrid.graph.shard.delete.delta";
+
@Default( "1000" )
@Key( SCAN_PAGE_SIZE )
@@ -80,9 +87,9 @@ public interface GraphFig extends GuicyFig {
/**
- * A 1% repair chance. On average we'll check to repair on 1 out of every 100 reads
+ * A 2% repair chance. On average we'll check to repair on 2 out of every 100 reads
*/
- @Default( ".01" )
+ @Default( ".02" )
@Key( SHARD_REPAIR_CHANCE )
double getShardRepairChance();
@@ -104,6 +111,11 @@ public interface GraphFig extends GuicyFig {
@Key( COUNTER_WRITE_FLUSH_QUEUE_SIZE )
int getCounterFlushQueueSize();
+ @Default( "60000" )
+ @Key( SHARD_DELETE_DELTA )
+ long getShardDeleteDelta();
+
+
@Default( "CL_EACH_QUORUM" )
@Key( SHARD_WRITE_CONSISTENCY )
String getShardWriteConsistency();
@@ -114,7 +126,5 @@ public interface GraphFig extends GuicyFig {
@Default( "CL_LOCAL_QUORUM" )
@Key( SHARD_READ_CONSISTENCY )
String getShardReadConsistency();
-
-
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/06d34dbc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
index 752f417..067f33a 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
@@ -64,10 +64,12 @@ import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardGroup
import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardGroupDeletion;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardedEdgeSerialization;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardedEdgeSerializationImpl;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.AsyncTaskExecutorImpl;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.EdgeShardSerializationImpl;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.NodeShardAllocationImpl;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.NodeShardGroupSearchImpl;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardGroupCompactionImpl;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardGroupDeletionImpl;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.SizebasedEdgeColumnFamilies;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.SizebasedEdgeShardStrategy;
http://git-wip-us.apache.org/repos/asf/usergrid/blob/06d34dbc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
index c33e0d8..1379209 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
@@ -43,6 +43,10 @@ public class ShardEntryGroup {
private List<Shard> shards;
+ private final long delta;
+
+ private long maxCreatedTime;
+
private Shard compactionTarget;
private Shard rootShard;
@@ -51,7 +55,9 @@ public class ShardEntryGroup {
/**
* The max delta we accept in milliseconds for create time to be considered a member of this group
*/
- public ShardEntryGroup() {
+ public ShardEntryGroup( final long delta ) {
+ Preconditions.checkArgument( delta > 0, "delta must be greater than 0" );
+ this.delta = delta;
this.shards = new ArrayList<>();
}
@@ -101,6 +107,8 @@ public class ShardEntryGroup {
private void addShardInternal( final Shard shard ) {
shards.add( shard );
+ maxCreatedTime = Math.max( maxCreatedTime, shard.getCreatedTime() );
+
//we're changing our structure, unset the compaction target
compactionTarget = null;
}
@@ -293,6 +301,7 @@ public class ShardEntryGroup {
* We don't have enough shards to compact, ignore
*/
return getCompactionTarget() != null;
+
}
@@ -346,8 +355,11 @@ public class ShardEntryGroup {
@Override
public String toString() {
return "ShardEntryGroup{" +
- "shards=" + shards +
- ", compactionTarget=" + compactionTarget +
+ "compactionTarget=" + compactionTarget +
+ ", shards=" + shards +
+ ", delta=" + delta +
+ ", maxCreatedTime=" + maxCreatedTime +
+ ", rootShard=" + rootShard +
'}';
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/06d34dbc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
index 8f9e27f..436fd74 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
@@ -74,8 +74,9 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
@Inject
public NodeShardAllocationImpl( final EdgeShardSerialization edgeShardSerialization,
final EdgeColumnFamilies edgeColumnFamilies,
- final ShardedEdgeSerialization shardedEdgeSerialization, final TimeService timeService,
- final GraphFig graphFig, final ShardGroupCompaction shardGroupCompaction) {
+ final ShardedEdgeSerialization shardedEdgeSerialization,
+ final TimeService timeService, final GraphFig graphFig,
+ final ShardGroupCompaction shardGroupCompaction ) {
this.edgeShardSerialization = edgeShardSerialization;
this.edgeColumnFamilies = edgeColumnFamilies;
this.shardedEdgeSerialization = shardedEdgeSerialization;
@@ -100,26 +101,26 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
existingShards = edgeShardSerialization.getShardMetaDataLocal( scope, maxShardId, directedEdgeMeta );
}
- /**
- * We didn't get anything out of cassandra, so we need to create the minumum shard
- */
- if ( existingShards == null || !existingShards.hasNext() ) {
-
+ /**
+ * We didn't get anything out of cassandra, so we need to create the minumum shard
+ */
+ if ( existingShards == null || !existingShards.hasNext() ) {
- final MutationBatch batch = edgeShardSerialization.writeShardMeta( scope, Shard.MIN_SHARD, directedEdgeMeta );
- try {
- batch.execute();
- }
- catch ( ConnectionException e ) {
- throw new RuntimeException( "Unable to connect to casandra", e );
- }
- existingShards = Collections.singleton( Shard.MIN_SHARD ).iterator();
+ final MutationBatch batch =
+ edgeShardSerialization.writeShardMeta( scope, Shard.MIN_SHARD, directedEdgeMeta );
+ try {
+ batch.execute();
}
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to connect to casandra", e );
+ }
+
+ existingShards = Collections.singleton( Shard.MIN_SHARD ).iterator();
}
- return new ShardEntryGroupIterator( existingShards, shardGroupCompaction, scope, directedEdgeMeta );
+ return new ShardEntryGroupIterator( existingShards, graphFig.getShardDeleteDelta(), shardGroupCompaction, scope, directedEdgeMeta );
}
@@ -171,7 +172,6 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
}
-
final long shardSize = graphFig.getShardSize();
@@ -203,8 +203,7 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
if ( !edges.hasNext() ) {
- logger.debug(
- "Tried to allocate a new shard for group {}, but no max value could be found in that row",
+ logger.debug( "Tried to allocate a new shard for group {}, but no max value could be found in that row",
readShards );
return false;
}
@@ -225,7 +224,7 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
//we hit a pivot shard, set it since it could be the last one we encounter
if ( i % shardSize == 0 ) {
marked = edges.next();
- logger.debug("Found an edge {} to split at index {}", marked, i);
+ logger.debug( "Found an edge {} to split at index {}", marked, i );
}
else {
edges.next();
@@ -233,8 +232,6 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
}
-
-
/**
* Sanity check in case our counters become severely out of sync with our edge state in cassandra.
*/
@@ -323,13 +320,13 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
return Collections.<ShardEntryGroup>emptyList().iterator();
}
- return new ShardEntryGroupIterator( shards, NO_OP_COMPACTION, scope, directedEdgeMeta );
+ return new ShardEntryGroupIterator( shards, graphFig.getShardDeleteDelta(), NO_OP_COMPACTION, scope, directedEdgeMeta );
}
/**
- * Class that just ignores compaction events, since we're already evaluating the events. A bit of a hack
- * that shows we need some refactoring
+ * Class that just ignores compaction events, since we're already evaluating the events. A bit of a hack that shows
+ * we need some refactoring
*/
private final static class NoOpCompaction implements ShardGroupCompaction {
@@ -342,6 +339,4 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
return Futures.immediateFuture( AuditResult.NOT_CHECKED );
}
}
-
-
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/06d34dbc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java
index cbe35b6..3a1c4d8 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java
@@ -26,6 +26,7 @@ public class ShardEntryGroupIterator implements Iterator<ShardEntryGroup> {
private final ShardGroupCompaction shardGroupCompaction;
private final PushbackIterator<Shard> sourceIterator;
+ private final long minDelta;
private final ApplicationScope scope;
private final DirectedEdgeMeta directedEdgeMeta;
@@ -39,7 +40,7 @@ public class ShardEntryGroupIterator implements Iterator<ShardEntryGroup> {
* @param shardIterator The iterator of all shards. Order is expected to be by the shard index from Long.MAX to
* Long.MIN
*/
- public ShardEntryGroupIterator( final Iterator<Shard> shardIterator,
+ public ShardEntryGroupIterator( final Iterator<Shard> shardIterator, final long minDelta,
final ShardGroupCompaction shardGroupCompaction, final ApplicationScope scope,
final DirectedEdgeMeta directedEdgeMeta ) {
@@ -49,6 +50,7 @@ public class ShardEntryGroupIterator implements Iterator<ShardEntryGroup> {
this.directedEdgeMeta = directedEdgeMeta;
this.sourceIterator = new PushbackIterator( shardIterator );
this.shardGroupCompaction = shardGroupCompaction;
+ this.minDelta = minDelta;
}
@@ -95,7 +97,7 @@ public class ShardEntryGroupIterator implements Iterator<ShardEntryGroup> {
while ( sourceIterator.hasNext() ) {
if ( next == null ) {
- next = new ShardEntryGroup( );
+ next = new ShardEntryGroup( minDelta);
}
final Shard shard = sourceIterator.next();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/06d34dbc/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
index f16cc78..2ac67aa 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
@@ -54,8 +54,11 @@ import org.apache.usergrid.persistence.core.util.IdGenerator;
import org.apache.usergrid.persistence.graph.guice.TestGraphModule;
import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardAllocation;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardGroupSearch;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.NodeShardAllocationImpl;
import org.apache.usergrid.persistence.model.entity.Id;
import com.codahale.metrics.Meter;
@@ -86,9 +89,6 @@ public class GraphManagerShardConsistencyIT {
private static final Meter writeMeter = registry.meter( "writeThroughput" );
- private static final Slf4jReporter reporter =
- Slf4jReporter.forRegistry( registry ).outputTo( log ).convertRatesTo( TimeUnit.SECONDS )
- .convertDurationsTo( TimeUnit.MILLISECONDS ).build();
private Slf4jReporter reporter;
@@ -97,10 +97,6 @@ public class GraphManagerShardConsistencyIT {
protected Object originalShardSize;
- protected Object originalShardTimeout;
-
- protected Object originalShardDelta;
-
protected ListeningExecutorService executor;
@@ -131,10 +127,14 @@ public class GraphManagerShardConsistencyIT {
@After
public void tearDown() {
- reporter.stop();
- reporter.report();
+ if(reporter != null) {
+ reporter.stop();
+ reporter.report();
+ }
- executor.shutdownNow();
+ if(executor != null) {
+ executor.shutdownNow();
+ }
}
@@ -373,8 +373,6 @@ public class GraphManagerShardConsistencyIT {
Thread.sleep( 2000 );
}
-
- executor.shutdownNow();
}
@@ -484,10 +482,6 @@ public class GraphManagerShardConsistencyIT {
final AtomicLong writeCounter = new AtomicLong();
- //min stop time the min delta + 1 cache cycle timeout
- final long minExecutionTime = graphFig.getShardMinDelta() + graphFig.getShardCacheTimeout();
-
-
log.info( "Writing {} edges per worker on {} workers in {} injectors", workerWriteLimit, numWorkersPerInjector,
numInjectors );
@@ -498,10 +492,9 @@ public class GraphManagerShardConsistencyIT {
for ( Injector injector : injectors ) {
final GraphManagerFactory gmf = injector.getInstance( GraphManagerFactory.class );
-
for ( int i = 0; i < numWorkersPerInjector; i++ ) {
Future<Boolean> future =
- executor.submit( new Worker( gmf, generator, workerWriteLimit, minExecutionTime, writeCounter ) );
+ executor.submit( new Worker( gmf, generator, workerWriteLimit, writeCounter ) );
futures.add( future );
}
@@ -515,7 +508,7 @@ public class GraphManagerShardConsistencyIT {
}
//now get all our shards
- final NodeShardCache cache = getInstance( injectors, NodeShardCache.class );
+ final NodeShardGroupSearch nodeShardGroupSearch = getInstance( injectors, NodeShardGroupSearch.class );
final DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromSourceNode( sourceId, edgeType );
@@ -531,7 +524,7 @@ public class GraphManagerShardConsistencyIT {
final Iterator<ShardEntryGroup> existingShardGroups =
- cache.getReadShardGroup( scope, Long.MAX_VALUE, directedEdgeMeta );
+ nodeShardGroupSearch.getReadShardGroup( scope, Long.MAX_VALUE, directedEdgeMeta );
int shardCount = 0;
while ( existingShardGroups.hasNext() ) {
@@ -621,7 +614,7 @@ public class GraphManagerShardConsistencyIT {
shardCount = 0;
//we have to get it from the cache, because this will trigger the compaction process
- final Iterator<ShardEntryGroup> groups = cache.getReadShardGroup( scope, Long.MAX_VALUE, directedEdgeMeta );
+ final Iterator<ShardEntryGroup> groups = nodeShardGroupSearch.getReadShardGroup( scope, Long.MAX_VALUE, directedEdgeMeta );
ShardEntryGroup group = null;
@@ -759,7 +752,7 @@ public class GraphManagerShardConsistencyIT {
/**
* Perform the search returning an observable edge
*/
- Observable<Edge> doSearch( final GraphManager manager );
+ Observable<MarkedEdge> doSearch( final GraphManager manager );
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/06d34dbc/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
index 45047f1..ee2fb19 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
@@ -82,6 +82,8 @@ public class NodeShardAllocationTest {
when( graphFig.getShardSize() ).thenReturn( 20000l );
+ when(graphFig.getShardDeleteDelta()).thenReturn( 60000l );
+
}
@@ -114,7 +116,7 @@ public class NodeShardAllocationTest {
final Shard firstShard = new Shard( 0l, 0l, true );
final Shard futureShard = new Shard( 10000l, timeservicetime, false );
- final ShardEntryGroup shardEntryGroup = new ShardEntryGroup( );
+ final ShardEntryGroup shardEntryGroup = new ShardEntryGroup(10000l );
shardEntryGroup.addShard( futureShard );
shardEntryGroup.addShard( firstShard );
@@ -160,7 +162,7 @@ public class NodeShardAllocationTest {
final Shard futureShard = new Shard( 10000l, timeservicetime, true );
- final ShardEntryGroup shardEntryGroup = new ShardEntryGroup( );
+ final ShardEntryGroup shardEntryGroup = new ShardEntryGroup(10000l );
shardEntryGroup.addShard( futureShard );
@@ -209,7 +211,7 @@ public class NodeShardAllocationTest {
final Shard firstShard = new Shard( 0l, 0l, true );
- final ShardEntryGroup shardEntryGroup = new ShardEntryGroup( );
+ final ShardEntryGroup shardEntryGroup = new ShardEntryGroup(10000l );
shardEntryGroup.addShard( firstShard );
final DirectedEdgeMeta targetEdgeMeta = DirectedEdgeMeta.fromSourceNodeTargetType( nodeId, type, subType );
@@ -323,7 +325,7 @@ public class NodeShardAllocationTest {
final Shard firstShard = new Shard( 0l, 0l, true );
- final ShardEntryGroup shardEntryGroup = new ShardEntryGroup( );
+ final ShardEntryGroup shardEntryGroup = new ShardEntryGroup(10000l );
shardEntryGroup.addShard( firstShard );
final DirectedEdgeMeta targetEdgeMeta = DirectedEdgeMeta.fromSourceNodeTargetType( nodeId, type, subType );
@@ -413,7 +415,7 @@ public class NodeShardAllocationTest {
final Shard firstShard = new Shard( 0l, 0l, true );
- final ShardEntryGroup shardEntryGroup = new ShardEntryGroup( );
+ final ShardEntryGroup shardEntryGroup = new ShardEntryGroup(10000l );
shardEntryGroup.addShard( firstShard );
final DirectedEdgeMeta targetEdgeMeta = DirectedEdgeMeta.fromSourceNodeTargetType( nodeId, type, subType );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/06d34dbc/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardGroupSearchTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardGroupSearchTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardGroupSearchTest.java
index c9fe4a4..910dde0 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardGroupSearchTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardGroupSearchTest.java
@@ -94,7 +94,7 @@ public class NodeShardGroupSearchTest {
final Optional max = Optional.absent();
- final ShardEntryGroup group = new ShardEntryGroup();
+ final ShardEntryGroup group = new ShardEntryGroup(10000l);
group.addShard( new Shard( 0, 0, true ) );
@@ -168,14 +168,14 @@ public class NodeShardGroupSearchTest {
/**
* Simulate returning all shards
*/
- final ShardEntryGroup minShardGroup = new ShardEntryGroup();
+ final ShardEntryGroup minShardGroup = new ShardEntryGroup(10000l);
minShardGroup.addShard( minShard );
- final ShardEntryGroup midShardGroup = new ShardEntryGroup();
+ final ShardEntryGroup midShardGroup = new ShardEntryGroup(10000l);
midShardGroup.addShard( midShard );
- final ShardEntryGroup maxShardGroup = new ShardEntryGroup();
+ final ShardEntryGroup maxShardGroup = new ShardEntryGroup(10000l);
maxShardGroup.addShard( maxShard );
@@ -211,7 +211,7 @@ public class NodeShardGroupSearchTest {
//use "thenAnswer" so we always return the value, even if it's invoked more than 1 time.
.thenAnswer( answer -> new ShardEntryGroupIterator( Collections.singleton( minShard ).iterator(),
- NoOpShardCompaction.INSTANCE, scope, directedEdgeMeta ) );
+ 10000l, NoOpShardCompaction.INSTANCE, scope, directedEdgeMeta ) );
/**
@@ -243,7 +243,7 @@ public class NodeShardGroupSearchTest {
//use "thenAnswer" so we always return the value, even if it's invoked more than 1 time.
.thenAnswer( answer -> new ShardEntryGroupIterator( Arrays.asList( midShard, minShard ).iterator(),
- NoOpShardCompaction.INSTANCE, scope, directedEdgeMeta ) );
+ 10000l, NoOpShardCompaction.INSTANCE, scope, directedEdgeMeta ) );
/**
@@ -276,7 +276,7 @@ public class NodeShardGroupSearchTest {
//use "thenAnswer" so we always return the value, even if it's invoked more than 1 time.
.thenAnswer(
answer -> new ShardEntryGroupIterator( Arrays.asList( maxShard, midShard, minShard ).iterator(),
- NoOpShardCompaction.INSTANCE, scope, directedEdgeMeta ) );
+ 10000l, NoOpShardCompaction.INSTANCE, scope, directedEdgeMeta ) );
//check getting equal to our min, mid and max
http://git-wip-us.apache.org/repos/asf/usergrid/blob/06d34dbc/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroupTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroupTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroupTest.java
index d609d96..d9f8a2b 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroupTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroupTest.java
@@ -40,7 +40,7 @@ public class ShardEntryGroupTest {
Shard rootShard = new Shard( 0, 0, false );
- ShardEntryGroup shardEntryGroup = new ShardEntryGroup( );
+ ShardEntryGroup shardEntryGroup = new ShardEntryGroup( 10000l );
final boolean result = shardEntryGroup.addShard( rootShard );
@@ -62,7 +62,7 @@ public class ShardEntryGroupTest {
Shard secondShard = new Shard( 1000, 1001, false );
- ShardEntryGroup shardEntryGroup = new ShardEntryGroup( );
+ ShardEntryGroup shardEntryGroup = new ShardEntryGroup( 10000l );
boolean result = shardEntryGroup.addShard( secondShard );
@@ -73,7 +73,6 @@ public class ShardEntryGroupTest {
assertTrue( "Shard added", result );
-
assertFalse( "First shard cannot be deleted", shardEntryGroup.canBeDeleted( firstShard ) );
assertFalse( "Second shard cannot be deleted", shardEntryGroup.canBeDeleted( secondShard ) );
@@ -81,9 +80,6 @@ public class ShardEntryGroupTest {
assertFalse( "Duplicate shard id cannot be deleted", shardEntryGroup.canBeDeleted( secondShard ) );
assertNull( "Can't compact, no min compacted shard present", shardEntryGroup.getCompactionTarget() );
-
-
-
}
@@ -97,7 +93,7 @@ public class ShardEntryGroupTest {
Shard secondShard = new Shard( 1000, 1001, false );
- ShardEntryGroup shardEntryGroup = new ShardEntryGroup( );
+ ShardEntryGroup shardEntryGroup = new ShardEntryGroup( 10000l );
boolean result = shardEntryGroup.addShard( secondShard );
@@ -123,11 +119,9 @@ public class ShardEntryGroupTest {
//we should compact these
assertTrue( "Merge should be run", shardEntryGroup.shouldCompact() );
-
}
-
@Test
public void lowerTimestampHigherShard() {
@@ -139,7 +133,7 @@ public class ShardEntryGroupTest {
Shard compactedShard = new Shard( 500, 200, true );
- ShardEntryGroup shardEntryGroup = new ShardEntryGroup( );
+ ShardEntryGroup shardEntryGroup = new ShardEntryGroup( 10000l );
boolean result = shardEntryGroup.addShard( firstShard );
@@ -154,19 +148,15 @@ public class ShardEntryGroupTest {
assertTrue( "Shard added", result );
+ assertFalse( "First shard cannot be deleted", shardEntryGroup.canBeDeleted( secondShard ) );
- assertFalse( "First shard cannot be deleted", shardEntryGroup.canBeDeleted( secondShard ) );
-
- assertTrue( "Second shard can be deleted", shardEntryGroup.canBeDeleted( firstShard ) );
-
- assertEquals( "Can't compact, no min compacted shard present", secondShard, shardEntryGroup.getCompactionTarget() );
-
-
+ assertTrue( "Second shard can be deleted", shardEntryGroup.canBeDeleted( firstShard ) );
+ assertEquals( "Can't compact, no min compacted shard present", secondShard,
+ shardEntryGroup.getCompactionTarget() );
}
-
@Test
public void multipleShardGroups() {
@@ -179,7 +169,7 @@ public class ShardEntryGroupTest {
Shard compactedShard2 = new Shard( 800, 7000, true );
- ShardEntryGroup shardEntryGroup = new ShardEntryGroup( );
+ ShardEntryGroup shardEntryGroup = new ShardEntryGroup( 10000l );
boolean result = shardEntryGroup.addShard( firstShard );
@@ -197,7 +187,7 @@ public class ShardEntryGroupTest {
assertFalse( "Shouldn't add since it's compacted", result );
- ShardEntryGroup secondGroup = new ShardEntryGroup( );
+ ShardEntryGroup secondGroup = new ShardEntryGroup( 10000l );
result = secondGroup.addShard( compactedShard2 );
@@ -214,7 +204,7 @@ public class ShardEntryGroupTest {
Shard compactedShard1 = new Shard( 900, 8000, true );
- ShardEntryGroup shardEntryGroup = new ShardEntryGroup( );
+ ShardEntryGroup shardEntryGroup = new ShardEntryGroup( 10000l );
boolean result = shardEntryGroup.addShard( firstShard );
@@ -236,9 +226,7 @@ public class ShardEntryGroupTest {
assertEquals( "Same shard for merge target", secondShard, shardEntryGroup.getCompactionTarget() );
//Should return true, we can merge
- assertTrue( "Merge cannot be run within min time",
- shardEntryGroup.shouldCompact() );
-
+ assertTrue( "Merge cannot be run within min time", shardEntryGroup.shouldCompact() );
}
@@ -255,7 +243,7 @@ public class ShardEntryGroupTest {
Shard compactedShard1 = new Shard( 900, 8000, true );
- ShardEntryGroup shardEntryGroup = new ShardEntryGroup( );
+ ShardEntryGroup shardEntryGroup = new ShardEntryGroup( 10000l );
boolean result = shardEntryGroup.addShard( firstShard );
@@ -292,7 +280,7 @@ public class ShardEntryGroupTest {
Shard compactedShard = new Shard( 900, 8000, true );
- ShardEntryGroup shardEntryGroup = new ShardEntryGroup( );
+ ShardEntryGroup shardEntryGroup = new ShardEntryGroup( 10000l );
boolean result = shardEntryGroup.addShard( ignoredProposedShard );
@@ -307,7 +295,7 @@ public class ShardEntryGroupTest {
assertTrue( "Shard added", result );
- Collection<Shard> writeShards = shardEntryGroup.getWriteShards(newAllocatedCompactionTarget.getShardIndex() );
+ Collection<Shard> writeShards = shardEntryGroup.getWriteShards( newAllocatedCompactionTarget.getShardIndex() );
assertEquals( "Shard size correct", 1, writeShards.size() );
@@ -319,7 +307,6 @@ public class ShardEntryGroupTest {
assertEquals( "Shard size correct", 1, writeShards.size() );
assertTrue( "Lowest new shard present", writeShards.contains( compactedShard ) );
-
}
@@ -332,7 +319,7 @@ public class ShardEntryGroupTest {
Shard rootShard = new Shard( 0, 0, false );
- ShardEntryGroup shardEntryGroup = new ShardEntryGroup( );
+ ShardEntryGroup shardEntryGroup = new ShardEntryGroup( 10000l );
boolean result = shardEntryGroup.addShard( secondShard );
@@ -357,7 +344,7 @@ public class ShardEntryGroupTest {
Shard lowShard = new Shard( 10000, 1000, false );
- ShardEntryGroup shardEntryGroup = new ShardEntryGroup( );
+ ShardEntryGroup shardEntryGroup = new ShardEntryGroup( 10000l );
boolean result = shardEntryGroup.addShard( highShard );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/06d34dbc/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIteratorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIteratorTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIteratorTest.java
index 6a0e1e5..0ab60a3 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIteratorTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIteratorTest.java
@@ -59,7 +59,7 @@ public class ShardEntryGroupIteratorTest {
final Iterator<Shard> noShards = Collections.<Shard>emptyList().iterator();
//should blow up, our iterator is empty
- new ShardEntryGroupIterator( noShards, shardGroupCompaction, scope, directedEdgeMeta );
+ new ShardEntryGroupIterator( noShards, 10000l, shardGroupCompaction, scope, directedEdgeMeta );
}
@@ -77,7 +77,7 @@ public class ShardEntryGroupIteratorTest {
final Iterator<Shard> noShards = Collections.singleton( minShard ).iterator();
ShardEntryGroupIterator entryGroupIterator =
- new ShardEntryGroupIterator( noShards, shardGroupCompaction, scope, directedEdgeMeta );
+ new ShardEntryGroupIterator( noShards, 10000l, shardGroupCompaction, scope, directedEdgeMeta );
assertTrue( "Root shard always present", entryGroupIterator.hasNext() );
@@ -154,7 +154,7 @@ public class ShardEntryGroupIteratorTest {
ShardEntryGroupIterator entryGroupIterator =
- new ShardEntryGroupIterator( noShards, shardGroupCompaction, scope, directedEdgeMeta );
+ new ShardEntryGroupIterator( noShards, 10000l, shardGroupCompaction, scope, directedEdgeMeta );
assertTrue( "max group present", entryGroupIterator.hasNext() );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/06d34dbc/stack/corepersistence/locks/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/locks/pom.xml b/stack/corepersistence/locks/pom.xml
index de89053..6f6f77a 100644
--- a/stack/corepersistence/locks/pom.xml
+++ b/stack/corepersistence/locks/pom.xml
@@ -20,10 +20,10 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
- <artifactId>persistence</artifactId>
- <groupId>org.apache.usergrid</groupId>
- <version>2.1.0-SNAPSHOT</version>
- </parent>
+ <artifactId>persistence</artifactId>
+ <groupId>org.apache.usergrid</groupId>
+ <version>2.1.1-SNAPSHOT</version>
+ </parent>
<modelVersion>4.0.0</modelVersion>
<description>The module for handling all distributed locks</description>