You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/04/12 00:21:33 UTC
[7/7] git commit: Added tests for shard allocation and caching.
Added tests for shard allocation and caching.
Time UUID ordering when UUID is generated appears to be broken. Need to fix this.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/f1c61867
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/f1c61867
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/f1c61867
Branch: refs/heads/asyncqueue
Commit: f1c61867aa7ff73b8cbe17072b5e5ae47831d2f1
Parents: 21d4b64
Author: Todd Nine <tn...@apigee.com>
Authored: Fri Apr 11 14:45:35 2014 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri Apr 11 14:45:35 2014 -0700
----------------------------------------------------------------------
.../EntityCollectionManagerFactory.java | 4 +-
.../MultiTennantColumnFamilyDefinition.java | 2 +-
.../MvccLogEntrySerializationStrategyImpl.java | 2 +-
stack/corepersistence/graph/pom.xml | 6 +
.../usergrid/persistence/graph/GraphFig.java | 26 +-
.../graph/impl/shard/NodeShardAllocation.java | 11 -
.../impl/shard/NodeShardAllocationImpl.java | 138 ++++---
.../impl/shard/NodeShardApproximation.java | 22 +-
.../impl/shard/NodeShardApproximationImpl.java | 75 +---
.../graph/impl/shard/NodeShardCacheImpl.java | 19 +-
.../EdgeSeriesCounterSerialization.java | 34 +-
.../serialization/EdgeSeriesSerialization.java | 2 +-
.../graph/impl/shard/HyperLogLogTest.java | 8 +-
.../impl/shard/NodeShardAllocationTest.java | 411 +++++++++++++++++++
.../impl/shard/NodeShardApproximationTest.java | 141 +++++++
.../graph/impl/shard/NodeShardCacheTest.java | 4 +-
stack/corepersistence/model/pom.xml | 8 +
.../persistence/model/util/UUIDGenerator.java | 217 +++++++---
.../persistence/model/util/UUIDUtils.java | 38 ++
.../model/util/UUIDGeneratorTest.java | 17 +
20 files changed, 946 insertions(+), 239 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f1c61867/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerFactory.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerFactory.java
index a2f2ff7..eb5fb14 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerFactory.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerFactory.java
@@ -28,7 +28,7 @@ public interface EntityCollectionManagerFactory {
/**
* Create a new EntityCollectionManager for the given context.
* The EntityCollectionManager can safely be used on the current thread
- * and will cache responses. The returned instance should not be shared
+ * and will shard responses. The returned instance should not be shared
* among threads it will not be guaranteed to be thread safe.
*
* @param collectionScope The collectionScope collectionScope to use
@@ -44,7 +44,7 @@ public interface EntityCollectionManagerFactory {
/**
* Create a new EntityCollectionManagerSync for the given context.
* The EntityCollectionManager can safely be used on the current thread
- * and will cache responses. The returned instance should not be shared
+ * and will shard responses. The returned instance should not be shared
* among threads it will not be guaranteed to be thread safe.
* This implementation will be synchronous. Try to use the consistency
* implementation if possible
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f1c61867/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/astyanax/MultiTennantColumnFamilyDefinition.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/astyanax/MultiTennantColumnFamilyDefinition.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/astyanax/MultiTennantColumnFamilyDefinition.java
index ddbafc2..55c070e 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/astyanax/MultiTennantColumnFamilyDefinition.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/astyanax/MultiTennantColumnFamilyDefinition.java
@@ -39,7 +39,7 @@ public class MultiTennantColumnFamilyDefinition {
*/
public enum CacheOption {
/**
- * Use both row key cache and key cache
+ * Use both row key shard and key shard
*/
ALL( "ALL" ),
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f1c61867/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java
index ec19298..90b5d57 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java
@@ -237,7 +237,7 @@ public class MvccLogEntrySerializationStrategyImpl implements MvccLogEntrySerial
/**
- * Internal stage cache
+ * Internal stage shard
*/
private static class StageCache {
private Map<Integer, Stage> values = new HashMap<Integer, Stage>( Stage.values().length );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f1c61867/stack/corepersistence/graph/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/pom.xml b/stack/corepersistence/graph/pom.xml
index 64587a9..47c017d 100644
--- a/stack/corepersistence/graph/pom.xml
+++ b/stack/corepersistence/graph/pom.xml
@@ -39,6 +39,12 @@
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-collections4</artifactId>
+ <version>4.0</version>
+ </dependency>
+
<!-- utilized for hyperloglog cardinality terms -->
<dependency>
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f1c61867/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
index c4fae69..0032190 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
@@ -49,15 +49,15 @@ public interface GraphFig extends GuicyFig {
public static final String READ_TIMEOUT = "usergrid.graph.read.timeout";
- public static final String CACHE_TIMEOUT = "usergrid.graph.shard.timeout";
+ public static final String SHARD_SIZE = "usergrid.graph.shard.size";
- public static final String CACHE_SIZE = "usergrid.graph.shard.size";
+ public static final String SHARD_CACHE_SIZE = "usergrid.graph.shard.cache.size";
public static final String SHARD_CACHE_TIMEOUT = "usergrid.graph.shard.cache.timeout";
public static final String COUNTER_PRECISION_LOSS = "usergrid.graph.shard.counter.precision.loss";
- @Default( "1000" )
+ @Default("1000")
@Key(SCAN_PAGE_SIZE)
int getScanPageSize();
@@ -69,8 +69,8 @@ public interface GraphFig extends GuicyFig {
@Key(WRITE_CL)
String getWriteCL();
- @Default( "10000" )
- @Key( WRITE_TIMEOUT )
+ @Default("10000")
+ @Key(WRITE_TIMEOUT)
int getWriteTimeout();
/**
@@ -97,23 +97,21 @@ public interface GraphFig extends GuicyFig {
int getRepairTimeout();
- @Default( "10000" )
- @Key( CACHE_SIZE )
- int getShardCacheSize();
-
- @Default( "30000" )
- @Key( CACHE_TIMEOUT )
- long getCacheTimeout();
+ @Default("10000")
+ @Key(SHARD_SIZE)
+ long getShardSize();
@Default(".02")
@Key(COUNTER_PRECISION_LOSS)
double getShardCounterPrecisionLoss();
- @Default( "10000" )
+ @Default("30000")
@Key(SHARD_CACHE_TIMEOUT)
long getShardCacheTimeout();
-
+ @Default( "250000" )
+ @Key( SHARD_CACHE_SIZE )
+ long getShardCacheSize();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f1c61867/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardAllocation.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardAllocation.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardAllocation.java
index bede2ab..05f6818 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardAllocation.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardAllocation.java
@@ -59,15 +59,4 @@ public interface NodeShardAllocation {
public boolean auditMaxShard(final OrganizationScope scope, final Id nodeId, final String... edgeType);
- /**
- * Increment the shard Id the specified amount
- * @param scope The scope
- * @param nodeId The node id
- * @param shardId The shard id
- * @param count The count
- * @param edgeType The edge type
- */
- public void increment(final OrganizationScope scope, final Id nodeId, final UUID shardId, int count, final String... edgeType);
-
-
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f1c61867/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardAllocationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardAllocationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardAllocationImpl.java
index 4dc4b68..e6e8c66 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardAllocationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardAllocationImpl.java
@@ -20,9 +20,13 @@
package org.apache.usergrid.persistence.graph.impl.shard;
+import java.util.ArrayList;
import java.util.Iterator;
+import java.util.List;
import java.util.UUID;
+import org.apache.commons.collections4.iterators.PushbackIterator;
+
import org.apache.usergrid.persistence.collection.OrganizationScope;
import org.apache.usergrid.persistence.graph.GraphFig;
import org.apache.usergrid.persistence.graph.consistency.TimeService;
@@ -33,6 +37,7 @@ import org.apache.usergrid.persistence.model.util.UUIDGenerator;
import com.fasterxml.uuid.UUIDComparator;
import com.google.inject.Inject;
+import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
@@ -49,101 +54,138 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
private final EdgeSeriesCounterSerialization edgeSeriesCounterSerialization;
private final TimeService timeService;
private final GraphFig graphFig;
+ private final Keyspace keyspace;
@Inject
public NodeShardAllocationImpl( final EdgeSeriesSerialization edgeSeriesSerialization,
final EdgeSeriesCounterSerialization edgeSeriesCounterSerialization,
- final TimeService timeService, final GraphFig graphFig ) {
+ final TimeService timeService, final GraphFig graphFig, final Keyspace keyspace ) {
this.edgeSeriesSerialization = edgeSeriesSerialization;
this.edgeSeriesCounterSerialization = edgeSeriesCounterSerialization;
this.timeService = timeService;
this.graphFig = graphFig;
+ this.keyspace = keyspace;
}
@Override
public Iterator<UUID> getShards( final OrganizationScope scope, final Id nodeId, final UUID maxShardId,
final int pageSize, final String... edgeTypes ) {
- return edgeSeriesSerialization.getEdgeMetaData( scope, nodeId, maxShardId, pageSize, edgeTypes );
- }
+ final Iterator<UUID> existingShards =
+ edgeSeriesSerialization.getEdgeMetaData( scope, nodeId, maxShardId, pageSize, edgeTypes );
- @Override
- public boolean auditMaxShard( final OrganizationScope scope, final Id nodeId, final String... edgeType ) {
+ final PushbackIterator<UUID> pushbackIterator = new PushbackIterator( existingShards );
- final UUID now = UUIDGenerator.newTimeUUID();
- Iterator<UUID> maxShards = getShards( scope, nodeId, MAX_UUID, 1, edgeType );
+ final UUID now = UUIDGenerator.newTimeUUID(timeService.getCurrentTime());
- //if the first shard has already been allocated, do nothing.
- //now is already > than the max, don't do anything
- if ( maxShards.hasNext() && UUIDComparator.staticCompare( now, maxShards.next() ) < 0 ) {
- return false;
+ final List<UUID> futures = new ArrayList<UUID>();
+
+
+ //loop through all shards, any shard > now+1 should be deleted
+ while ( pushbackIterator.hasNext() ) {
+
+ final UUID value = pushbackIterator.next();
+
+ //we're done, our current time uuid is greater than the value stored
+ if ( UUIDComparator.staticCompare( now, value ) > 0 ) {
+ //push it back into the iterator
+ pushbackIterator.pushback( value );
+ break;
+ }
+
+ futures.add( value );
}
- final long newShardTime = timeService.getCurrentTime() + graphFig.getCacheTimeout()*2;
- //allocate a new shard at least now+ 2x our shard timeout. We want to be sure that all replicas pick up on the new
- //shard
+ //we have more than 1 future value, we need to remove it
+
+ MutationBatch cleanup = keyspace.prepareMutationBatch();
+
+ //remove all futures except the last one, it is the only value we shouldn't lazy remove
+ for ( int i = futures.size() - 2; i > 0; i-- ) {
+ final UUID toRemove = futures.get( i );
+
+ final MutationBatch batch = edgeSeriesSerialization.removeEdgeMeta( scope, nodeId, toRemove, edgeTypes );
+
+ cleanup.mergeShallow( batch );
+ }
- final UUID futureUUID = UUIDGenerator.newTimeUUID(newShardTime);
try {
- this.edgeSeriesSerialization.writeEdgeMeta( scope, nodeId, futureUUID, edgeType ).execute();
+ cleanup.execute();
}
catch ( ConnectionException e ) {
- throw new RuntimeException( "Unable to write the new edge metadata" );
+ throw new RuntimeException( "Unable to remove future shards, mutation error", e );
}
- UUID max = null;
+ final int futuresSize = futures.size();
- MutationBatch rollup = null;
+ if ( futuresSize > 0 ) {
+ pushbackIterator.pushback( futures.get( futuresSize - 1 ) );
+ }
- Iterator<UUID> shards = getShards( scope, nodeId, MAX_UUID, 1000, edgeType );
- while ( shards.hasNext() ) {
+ return pushbackIterator;
+ }
- final UUID shardId = shards.next();
- if ( UUIDComparator.staticCompare( shardId, max ) >= 0 ) {
- break;
- }
+ @Override
+ public boolean auditMaxShard( final OrganizationScope scope, final Id nodeId, final String... edgeType ) {
+ final UUID now = UUIDGenerator.newTimeUUID( timeService.getCurrentTime() );
- //remove the edge that is too large from the node shard allocation
- final MutationBatch batch = edgeSeriesSerialization.removeEdgeMeta( scope, nodeId, shardId, edgeType );
+ final Iterator<UUID> maxShards = getShards( scope, nodeId, MAX_UUID, 1, edgeType );
- if ( rollup == null ) {
- rollup = batch;
- }
- else {
- rollup.mergeShallow( batch );
- }
+ //if the first shard has already been allocated, do nothing.
- //while our max value is > than the value we just created, delete it
+ //now is already > than the max, don't do anything
+ if ( !maxShards.hasNext() ) {
+ return false;
}
- if ( rollup != null ) {
- try {
- rollup.execute();
- }
- catch ( ConnectionException e ) {
- throw new RuntimeException( "Unable to cleanup allocated shards" );
- }
+ final UUID maxShard = maxShards.next();
+
+ /**
+ * Nothing to do, it's already in the future
+ */
+ if ( UUIDComparator.staticCompare( maxShard, now ) > 0 ) {
+ return false;
}
- return true;
- }
+ /**
+ * Check out if we have a count for our shard allocation
+ */
+ final long count = edgeSeriesCounterSerialization.getCount( scope, nodeId, maxShard, edgeType );
- @Override
- public void increment( final OrganizationScope scope, final Id nodeId, final UUID shardId, final int count,
- final String... edgeType ) {
- //delegate
-// edgeSeriesCounterSerialization.incrementMetadataCount( scope, nodeId, shardId, count, edgeType );
+ if ( count < graphFig.getShardSize() ) {
+ return false;
+ }
+
+ //try to get a lock here, and fail if one isn't present
+
+ final long newShardTime = timeService.getCurrentTime() + graphFig.getShardCacheTimeout() * 2;
+
+ //allocate a new shard at least now+ 2x our shard timeout. We want to be sure that all replicas pick up on
+ // the new
+ //shard
+
+ final UUID futureUUID = UUIDGenerator.newTimeUUID( newShardTime );
+
+ try {
+ this.edgeSeriesSerialization.writeEdgeMeta( scope, nodeId, futureUUID, edgeType ).execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to write the new edge metadata" );
+ }
+
+
+ return true;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f1c61867/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardApproximation.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardApproximation.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardApproximation.java
index 8eda5c7..f86b95e 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardApproximation.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardApproximation.java
@@ -33,19 +33,21 @@ public interface NodeShardApproximation {
/**
- * Increment the shard Id the specified amount
- * @param scope The scope
- * @param nodeId The node id
- * @param shardId The shard id
- * @param edgeType The edge type
- */
- public void increment(final OrganizationScope scope, final Id nodeId, final UUID shardId, final String... edgeType);
+ * Increment the shard Id the specified amount
+ *
+ * @param scope The scope
+ * @param nodeId The node id
+ * @param shardId The shard id
+ * @param count
+ * @param edgeType The edge type
+ */
+ public void increment( final OrganizationScope scope, final Id nodeId, final UUID shardId, final long count,
+ final String... edgeType );
/**
* Get the approximation of the number of unique items
- * @return
*/
- public long getCount(final OrganizationScope scope, final Id nodeId, final UUID shardId, final String... edgeType);
-
+ public long getCount( final OrganizationScope scope, final Id nodeId, final UUID shardId,
+ final String... edgeType );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f1c61867/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardApproximationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardApproximationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardApproximationImpl.java
index 5a3db86..d8b0b63 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardApproximationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardApproximationImpl.java
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicLong;
import javax.inject.Inject;
@@ -60,36 +61,26 @@ import com.netflix.astyanax.serializers.UUIDSerializer;
public class NodeShardApproximationImpl implements NodeShardApproximation {
- //TODO T.N. refactor into an expiring local cache. We need each hyperlog to be it's own instance of a given shard
- //if this is to work\
- private static final UUIDSerializer UUID_SERIALIZER = UUIDSerializer.get();
-
- /**
- * We generate a new time uuid every time a new instance is started. We should never re-use an instance.
- */
- private final UUID identity = UUIDGenerator.newTimeUUID();
-
private final GraphFig graphFig;
- private final LoadingCache<ShardKey, HyperLogLog> graphLogs;
-
- private final EdgeSeriesCounterSerialization edgeSeriesCounterSerialization;
+ //TODO, replace with with our counters. This is just a POC for now. HyperLogLog appears to use too much ram, waiting
+ //to hear back on http://dsiutils.di.unimi.it/#install
+ //for our use case
+ private final LoadingCache<ShardKey, AtomicLong> graphLogs;
/**
* Create a time shard approximation with the correct configuration.
*/
@Inject
- public NodeShardApproximationImpl( final GraphFig graphFig,
- final EdgeSeriesCounterSerialization edgeSeriesCounterSerialization ) {
+ public NodeShardApproximationImpl( final GraphFig graphFig) {
this.graphFig = graphFig;
- this.edgeSeriesCounterSerialization = edgeSeriesCounterSerialization;
graphLogs = CacheBuilder.newBuilder()
- .maximumSize( graphFig.getShardCacheTimeout() )
- .build( new CacheLoader<ShardKey, HyperLogLog>() {
- public HyperLogLog load( ShardKey key ) {
- return loadCache( key );
+ .maximumSize( graphFig.getShardCacheSize() )
+ .build( new CacheLoader<ShardKey, AtomicLong>() {
+ public AtomicLong load( ShardKey key ) {
+ return new AtomicLong( );
}
} );
@@ -97,23 +88,14 @@ public class NodeShardApproximationImpl implements NodeShardApproximation {
@Override
- public void increment( final OrganizationScope scope, final Id nodeId, final UUID shardId,
+ public void increment( final OrganizationScope scope, final Id nodeId, final UUID shardId, final long count,
final String... edgeType ) {
-
- final ByteBuffer buff = UUID_SERIALIZER.toByteBuffer( shardId );
-
- byte[] bytes = buff.array();
-
-
-
- long longHash = MurmurHash.hash64(bytes, bytes.length );
-
final ShardKey key = new ShardKey( scope, nodeId, shardId, edgeType );
try {
- graphLogs.get( key).offerHashed( longHash );
+ graphLogs.get( key).addAndGet(count);
}
catch ( ExecutionException e ) {
throw new RuntimeException( "Unable to get hyperloglog from cache", e );
@@ -123,42 +105,23 @@ public class NodeShardApproximationImpl implements NodeShardApproximation {
}
- private HyperLogLog loadCache(ShardKey key){
-// edgeSeriesCounterSerialization
- return null;
- }
-
-
@Override
public long getCount( final OrganizationScope scope, final Id nodeId, final UUID shardId,
final String... edgeType ) {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- private byte[] hash( final OrganizationScope scope, final Id nodeId, final UUID shardId,
- final String... edgeTypes ) {
- StringBuilder builder = new StringBuilder();
-
- final Id organization = scope.getOrganization();
-
- builder.append( organization.getUuid() );
- builder.append( organization.getType() );
- builder.append( nodeId.getUuid() );
- builder.append( nodeId.getType() );
+ final ShardKey key = new ShardKey( scope, nodeId, shardId, edgeType );
- builder.append( shardId.toString() );
- for ( String edgeType : edgeTypes ) {
- builder.append( edgeType );
+ try {
+ return graphLogs.get( key ).get();
+ }
+ catch ( ExecutionException e ) {
+ throw new RuntimeException("Unable to execute cache get", e);
}
-
- return null;
-// return builder.toString().getBytes( CHARSET );
}
+
/**
* Internal class for shard keys
*/
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f1c61867/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardCacheImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardCacheImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardCacheImpl.java
index 714791f..fd03fc4 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardCacheImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardCacheImpl.java
@@ -77,7 +77,7 @@ public class NodeShardCacheImpl implements NodeShardCache {
public void propertyChange( final PropertyChangeEvent evt ) {
final String propertyName = evt.getPropertyName();
- if ( propertyName.equals( GraphFig.CACHE_SIZE ) || propertyName.equals( GraphFig.CACHE_TIMEOUT ) ) {
+ if ( propertyName.equals( GraphFig.SHARD_CACHE_SIZE ) || propertyName.equals( GraphFig.SHARD_CACHE_TIMEOUT ) ) {
updateCache();
}
}
@@ -122,23 +122,26 @@ public class NodeShardCacheImpl implements NodeShardCache {
private void updateCache() {
this.graphs = CacheBuilder.newBuilder().maximumSize( graphFig.getShardCacheSize() )
- .expireAfterWrite( graphFig.getCacheTimeout(), TimeUnit.MILLISECONDS )
+ .expireAfterWrite( graphFig.getShardCacheSize(), TimeUnit.MILLISECONDS )
.build( new CacheLoader<CacheKey, CacheEntry>() {
@Override
public CacheEntry load( final CacheKey key ) throws Exception {
- //TODO, we need to put some sort of upper bounds on this, it could possibly
- //get too large
- final Iterator<UUID> edges = nodeShardAllocation
- .getShards( key.scope, key.id, MAX_UUID, SHARD_PAGE_SIZE, key.types );
-
/**
- * Perform an async audit in case we need to allocate a new shard
+ * Perform an audit in case we need to allocate a new shard
*/
nodeShardAllocation.auditMaxShard( key.scope, key.id, key.types );
+ //TODO, we need to put some sort of upper bounds on this, it could possibly
+ //get too large
+
+
+
+
+ final Iterator<UUID> edges = nodeShardAllocation
+ .getShards( key.scope, key.id, MAX_UUID, SHARD_PAGE_SIZE, key.types );
return new CacheEntry( edges );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f1c61867/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeSeriesCounterSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeSeriesCounterSerialization.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeSeriesCounterSerialization.java
index c706137..f8fa615 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeSeriesCounterSerialization.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeSeriesCounterSerialization.java
@@ -59,18 +59,8 @@ public interface EdgeSeriesCounterSerialization {
* @param shardId The shard Id to use
* @param types The types to write to. Can be edge type, or edgeType+id type
*/
- public MutationBatch writeMetaDataLog( OrganizationScope scope, Id nodeId, UUID shardId, HyperLogLog log, UUID workerId, String... types );
+ public MutationBatch writeMetaDataLog( OrganizationScope scope, Id nodeId, UUID shardId, long count, String... types );
- /**
- * return all persistent instance of the hyperlog log
- * @param scope
- * @param nodeId
- * @param shardId
- * @param types
- * @return
- */
-
- public List<HyperLogLog> getMetadataLog( OrganizationScope scope, Id nodeId, UUID shardId, String... types );
/**
* Get the most recent rollup of all of the given summations. If one is not present the optional will be empty
@@ -80,28 +70,8 @@ public interface EdgeSeriesCounterSerialization {
* @param types
* @return
*/
- public Optional<HyperLogLog> getSummationLog(OrganizationScope scope, Id nodeId, UUID shardId, String... types);
-
- /**
- * Write the summation log. Uses the timestamp passed in the column
- * @param scope The scope to write
- * @param nodeId The id in the edge
- * @param shardId The shard Id to use
- * @param log The log to write
- * @param creatorId The identifier of this writer
- * @param types The types to write to. Can be edge type, or edgeType+id type
- */
- public MutationBatch writeSummationLog( OrganizationScope scope, Id nodeId, UUID shardId, HyperLogLog log, UUID workerId, String... types );
+ public long getCount(OrganizationScope scope, Id nodeId, UUID shardId, String... types);
- /**
- * Remove the slice from the edge meta data from the types.
- * @param scope
- * @param nodeId
- * @param shardId
- * @param types
- * @return
- */
- public MutationBatch removeEdgeMetadataCount( OrganizationScope scope, Id nodeId, UUID shardId, String... types );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f1c61867/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeSeriesSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeSeriesSerialization.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeSeriesSerialization.java
index 17c342f..a60e96e 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeSeriesSerialization.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeSeriesSerialization.java
@@ -45,7 +45,7 @@ public interface EdgeSeriesSerialization {
public MutationBatch writeEdgeMeta(OrganizationScope scope, Id nodeId, UUID slice, String... types);
/**
- * Get an iterator of all meta data and types
+ * Get an iterator of all meta data and types. Returns a range from High to low
* @param scope The organization scope
* @param nodeId The id of the node
* @param start The uuid to start seeking from. Values <= this value will be returned.
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f1c61867/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/shard/HyperLogLogTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/shard/HyperLogLogTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/shard/HyperLogLogTest.java
index e40bf6c..241fd62 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/shard/HyperLogLogTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/shard/HyperLogLogTest.java
@@ -43,7 +43,7 @@ public class HyperLogLogTest {
private static final UUIDSerializer UUID_SER = UUIDSerializer.get();
- private static final double LOSS = 0.01d;
+ private static final double LOSS = 0.10d;
@Test
@@ -133,9 +133,9 @@ public class HyperLogLogTest {
log.info( "Expected max is {}", max );
-// assertTrue( "Min is <= the cardinality", min <= cardinality );
-//
-// assertTrue( "Cardinality is <= max ", cardinality <= max );
+ assertTrue( "Min is <= the cardinality", min <= cardinality );
+
+ assertTrue( "Cardinality is <= max ", cardinality <= max );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f1c61867/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardAllocationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardAllocationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardAllocationTest.java
new file mode 100644
index 0000000..ee4472d
--- /dev/null
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardAllocationTest.java
@@ -0,0 +1,411 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.graph.impl.shard;
+
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.graph.GraphFig;
+import org.apache.usergrid.persistence.graph.consistency.TimeService;
+import org.apache.usergrid.persistence.graph.impl.Constants;
+import org.apache.usergrid.persistence.graph.serialization.EdgeSeriesCounterSerialization;
+import org.apache.usergrid.persistence.graph.serialization.EdgeSeriesSerialization;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import com.fasterxml.uuid.UUIDComparator;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createId;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class NodeShardAllocationTest {
+
+
+ private GraphFig graphFig;
+
+
+ protected OrganizationScope scope;
+
+
+ @Before
+ public void setup() {
+ scope = mock( OrganizationScope.class );
+
+ Id orgId = mock( Id.class );
+
+ when( orgId.getType() ).thenReturn( "organization" );
+ when( orgId.getUuid() ).thenReturn( UUIDGenerator.newTimeUUID() );
+
+ when( scope.getOrganization() ).thenReturn( orgId );
+
+ graphFig = mock( GraphFig.class );
+
+ when( graphFig.getShardCacheSize() ).thenReturn( 10000l );
+ when( graphFig.getShardSize() ).thenReturn( 20000l );
+ when( graphFig.getShardCacheTimeout()).thenReturn( 30000l );
+ }
+
+
+ @Test
+ public void noShards() {
+ final EdgeSeriesSerialization edgeSeriesSerialization = mock( EdgeSeriesSerialization.class );
+
+ final EdgeSeriesCounterSerialization edgeSeriesCounterSerialization =
+ mock( EdgeSeriesCounterSerialization.class );
+
+
+ final TimeService timeService = mock( TimeService.class );
+
+ final Keyspace keyspace = mock( Keyspace.class );
+
+ final MutationBatch batch = mock( MutationBatch.class );
+
+ when( keyspace.prepareMutationBatch() ).thenReturn( batch );
+
+ NodeShardAllocation approximation =
+ new NodeShardAllocationImpl( edgeSeriesSerialization, edgeSeriesCounterSerialization, timeService,
+ graphFig, keyspace );
+
+ final Id nodeId = createId( "test" );
+ final String type = "type";
+ final String subType = "subType";
+
+ /**
+ * Mock up returning an empty iterator, our audit shouldn't create a new shard
+ */
+ when( edgeSeriesSerialization
+ .getEdgeMetaData( same( scope ), same( nodeId ), any( UUID.class ), eq( 1 ), same( type ),
+ same( subType ) ) ).thenReturn( Collections.<UUID>emptyList().iterator() );
+
+ final boolean result = approximation.auditMaxShard( scope, nodeId, type, subType );
+
+ assertFalse( "No shard allocated", result );
+ }
+
+
+ @Test
+ public void existingFutureShard() {
+ final EdgeSeriesSerialization edgeSeriesSerialization = mock( EdgeSeriesSerialization.class );
+
+ final EdgeSeriesCounterSerialization edgeSeriesCounterSerialization =
+ mock( EdgeSeriesCounterSerialization.class );
+
+
+ final TimeService timeService = mock( TimeService.class );
+
+ final Keyspace keyspace = mock( Keyspace.class );
+
+
+ final MutationBatch batch = mock( MutationBatch.class );
+
+ when( keyspace.prepareMutationBatch() ).thenReturn( batch );
+
+
+ NodeShardAllocation approximation =
+ new NodeShardAllocationImpl( edgeSeriesSerialization, edgeSeriesCounterSerialization, timeService,
+ graphFig, keyspace );
+
+ final Id nodeId = createId( "test" );
+ final String type = "type";
+ final String subType = "subType";
+
+
+ final long timeservicetime = System.currentTimeMillis();
+
+ when( timeService.getCurrentTime() ).thenReturn( timeservicetime );
+
+ final UUID futureShard = UUIDGenerator.newTimeUUID( timeservicetime + graphFig.getShardCacheTimeout() * 2 );
+
+ /**
+ * Mock up returning a min shard, and a future shard
+ */
+ when( edgeSeriesSerialization
+ .getEdgeMetaData( same( scope ), same( nodeId ), any( UUID.class ), eq( 1 ), same( type ),
+ same( subType ) ) ).thenReturn( Arrays.asList( futureShard ).iterator() );
+
+ final boolean result = approximation.auditMaxShard( scope, nodeId, type, subType );
+
+ assertFalse( "No shard allocated", result );
+ }
+
+
+ @Test
+ public void lowCountFutureShard() {
+ final EdgeSeriesSerialization edgeSeriesSerialization = mock( EdgeSeriesSerialization.class );
+
+ final EdgeSeriesCounterSerialization edgeSeriesCounterSerialization =
+ mock( EdgeSeriesCounterSerialization.class );
+
+
+ final TimeService timeService = mock( TimeService.class );
+
+ final Keyspace keyspace = mock( Keyspace.class );
+
+ final MutationBatch batch = mock( MutationBatch.class );
+
+ when( keyspace.prepareMutationBatch() ).thenReturn( batch );
+
+
+ NodeShardAllocation approximation =
+ new NodeShardAllocationImpl( edgeSeriesSerialization, edgeSeriesCounterSerialization, timeService,
+ graphFig, keyspace );
+
+ final Id nodeId = createId( "test" );
+ final String type = "type";
+ final String subType = "subType";
+
+
+ final long timeservicetime = System.currentTimeMillis();
+
+ when( timeService.getCurrentTime() ).thenReturn( timeservicetime );
+
+
+ /**
+ * Mock up returning a min shard, and a future shard
+ */
+ when( edgeSeriesSerialization
+ .getEdgeMetaData( same( scope ), same( nodeId ), any( UUID.class ), eq( 1 ), same( type ),
+ same( subType ) ) ).thenReturn( Arrays.asList( Constants.MIN_UUID ).iterator() );
+
+
+ //return a shard size < our max by 1
+
+ final long count = graphFig.getShardSize() - 1;
+
+ when( edgeSeriesCounterSerialization
+ .getCount( same( scope ), same( nodeId ), eq( Constants.MIN_UUID ), same( type ), same( subType ) ) )
+ .thenReturn( count );
+
+ final boolean result = approximation.auditMaxShard( scope, nodeId, type, subType );
+
+ assertFalse( "Shard allocated", result );
+ }
+
+
+ @Test
+ public void equalCountFutureShard() {
+ final EdgeSeriesSerialization edgeSeriesSerialization = mock( EdgeSeriesSerialization.class );
+
+ final EdgeSeriesCounterSerialization edgeSeriesCounterSerialization =
+ mock( EdgeSeriesCounterSerialization.class );
+
+
+ final TimeService timeService = mock( TimeService.class );
+
+ final Keyspace keyspace = mock( Keyspace.class );
+
+ final MutationBatch batch = mock(MutationBatch.class);
+
+ when(keyspace.prepareMutationBatch()).thenReturn( batch );
+
+
+ NodeShardAllocation approximation =
+ new NodeShardAllocationImpl( edgeSeriesSerialization, edgeSeriesCounterSerialization, timeService,
+ graphFig, keyspace );
+
+ final Id nodeId = createId( "test" );
+ final String type = "type";
+ final String subType = "subType";
+
+
+ final long timeservicetime = System.currentTimeMillis();
+
+ when( timeService.getCurrentTime() ).thenReturn( timeservicetime );
+
+
+ /**
+ * Mock up returning a min shard
+ */
+ when( edgeSeriesSerialization
+ .getEdgeMetaData( same( scope ), same( nodeId ), any( UUID.class ), eq( 1 ), same( type ),
+ same( subType ) ) ).thenReturn( Arrays.asList( Constants.MIN_UUID ).iterator() );
+
+
+ final long shardCount = graphFig.getShardSize();
+
+ //return a shard size equal to our max
+ when( edgeSeriesCounterSerialization
+ .getCount( same( scope ), same( nodeId ), eq( Constants.MIN_UUID ), same( type ), same( subType ) ) )
+ .thenReturn( shardCount );
+
+ ArgumentCaptor<UUID> newUUIDValue = ArgumentCaptor.forClass( UUID.class );
+
+
+ //mock up our mutation
+ when( edgeSeriesSerialization
+ .writeEdgeMeta( same( scope ), same( nodeId ), newUUIDValue.capture(), same( type ), same( subType ) ) )
+ .thenReturn( mock( MutationBatch.class ) );
+
+
+ final boolean result = approximation.auditMaxShard( scope, nodeId, type, subType );
+
+ assertTrue( "Shard allocated", result );
+
+ //check our new allocated UUID
+
+ final long expectedUUIDTime = timeservicetime + 2 * graphFig.getShardCacheTimeout();
+
+ UUID expectedUUID = UUIDGenerator.newTimeUUID( expectedUUIDTime );
+
+ assertEquals( "Expected UUID at 2x timeout generated", expectedUUID, newUUIDValue.getValue() );
+ }
+
+
+
+
+ @Test
+ public void futureCountShardCleanup() {
+ final EdgeSeriesSerialization edgeSeriesSerialization = mock( EdgeSeriesSerialization.class );
+
+ final EdgeSeriesCounterSerialization edgeSeriesCounterSerialization =
+ mock( EdgeSeriesCounterSerialization.class );
+
+
+ final TimeService timeService = mock( TimeService.class );
+
+ final Keyspace keyspace = mock( Keyspace.class );
+
+ final MutationBatch batch = mock(MutationBatch.class);
+
+ when(keyspace.prepareMutationBatch()).thenReturn( batch );
+
+
+ NodeShardAllocation approximation =
+ new NodeShardAllocationImpl( edgeSeriesSerialization, edgeSeriesCounterSerialization, timeService,
+ graphFig, keyspace );
+
+ final Id nodeId = createId( "test" );
+ final String type = "type";
+ final String subType = "subType";
+
+
+ /**
+ * Use the time service to generate UUIDS
+ */
+ final long timeservicetime = System.currentTimeMillis();
+
+
+ when( timeService.getCurrentTime() ).thenReturn( timeservicetime );
+
+ assertTrue("Shard cache mocked", graphFig.getShardCacheTimeout() > 0);
+
+
+ /**
+ * Simulates clock drift when 2 nodes create future shards near one another
+ */
+ final long futureTime = timeService.getCurrentTime() + 2 * graphFig.getShardCacheTimeout();
+
+
+ assertTrue("Future time is actually in the future", futureTime > timeService.getCurrentTime());
+
+
+
+
+ UUID futureUUID1 = UUIDGenerator.newTimeUUID(futureTime);
+
+ UUID futureUUID2 = UUIDGenerator.newTimeUUID(futureTime+1);
+
+ UUID futureUUID3 = UUIDGenerator.newTimeUUID(futureTime+2);
+
+
+ UUID now = UUIDGenerator.newTimeUUID( timeService.getCurrentTime() );
+
+
+ //verify all future IDS are greater than "now" if they're not, then there's something wrong with the
+ //UUID Generation, which is crucial to this functionality
+
+ assertTrue( UUIDComparator.staticCompare( futureUUID1, now ) > 0);
+
+ assertTrue( UUIDComparator.staticCompare( futureUUID2, now ) > 0);
+
+ assertTrue( UUIDComparator.staticCompare( futureUUID3, now ) > 0);
+
+
+ final int pageSize = 100;
+
+ /**
+ * Mock up returning a min shard
+ */
+ when( edgeSeriesSerialization
+ .getEdgeMetaData( same( scope ), same( nodeId ), any( UUID.class ), eq( pageSize ), same( type ),
+ same( subType ) ) ).thenReturn( Arrays.asList(futureUUID3, futureUUID2, futureUUID1, Constants.MIN_UUID ).iterator() );
+
+
+
+ ArgumentCaptor<UUID> newUUIDValue = ArgumentCaptor.forClass( UUID.class );
+
+
+
+
+ //mock up our mutation
+ when( edgeSeriesSerialization
+ .removeEdgeMeta( same( scope ), same( nodeId ), newUUIDValue.capture(), same( type ), same( subType ) ) )
+ .thenReturn( mock( MutationBatch.class ) );
+
+
+ final Iterator<UUID>
+ result = approximation.getShards( scope, nodeId, Constants.MAX_UUID, pageSize, type, subType );
+
+
+ assertTrue("Shards present", result.hasNext());
+
+ assertEquals("Only single next shard returned", futureUUID1, result.next());
+
+ assertTrue("Shards present", result.hasNext());
+
+ assertEquals("Previous shard present", Constants.MIN_UUID, result.next());
+
+ assertFalse("No shards left", result.hasNext());
+
+ /**
+ * Now we need to verify that both our mutations have been added
+ */
+
+ List<UUID> values = newUUIDValue.getAllValues();
+
+ assertEquals("2 values removed", values.size());
+
+ assertEquals("Deleted Max Future", futureUUID3, values.get( 0 ));
+ assertEquals("Deleted Next Future", futureUUID2, values.get( 1 ));
+
+
+
+
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f1c61867/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardApproximationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardApproximationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardApproximationTest.java
new file mode 100644
index 0000000..1b7725f
--- /dev/null
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardApproximationTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.graph.impl.shard;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.graph.GraphFig;
+import org.apache.usergrid.persistence.graph.serialization.EdgeSeriesCounterSerialization;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createId;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class NodeShardApproximationTest {
+
+
+ private GraphFig graphFig;
+
+
+ protected OrganizationScope scope;
+
+
+ @Before
+ public void setup() {
+ scope = mock( OrganizationScope.class );
+
+ Id orgId = mock( Id.class );
+
+ when( orgId.getType() ).thenReturn( "organization" );
+ when( orgId.getUuid() ).thenReturn( UUIDGenerator.newTimeUUID() );
+
+ when( scope.getOrganization() ).thenReturn( orgId );
+
+ graphFig = mock( GraphFig.class );
+
+ when( graphFig.getShardCacheSize() ).thenReturn( 10000l );
+ when(graphFig.getShardSize()).thenReturn( 250000l );
+ }
+
+
+ @Test
+ public void testSingleShard() {
+
+ EdgeSeriesCounterSerialization ser = mock( EdgeSeriesCounterSerialization.class );
+
+ NodeShardApproximation approximation = new NodeShardApproximationImpl( graphFig );
+
+
+ final Id id = createId( "test" );
+ final UUID shardId = UUIDGenerator.newTimeUUID();
+ final String type = "type";
+ final String type2 = "subType";
+
+ long count = approximation.getCount( scope, id, shardId, type, type2 );
+
+ assertEquals( 0, count );
+ }
+
+
+ @Test
+ public void testMultipleShard() throws ExecutionException, InterruptedException {
+
+
+ final NodeShardApproximation approximation = new NodeShardApproximationImpl( graphFig );
+
+
+ final int increments = 1000000;
+ final int workers = 100;
+
+ final Id id = createId( "test" );
+ final String type = "type";
+ final String type2 = "subType";
+
+ ExecutorService executor = Executors.newFixedThreadPool( workers );
+
+ List<Future<Long>> futures = new ArrayList<>( workers );
+
+ for ( int i = 0; i < workers; i++ ) {
+
+ final Future<Long> future = executor.submit( new Callable<Long>() {
+ @Override
+ public Long call() throws Exception {
+
+ final UUID shardId = UUIDGenerator.newTimeUUID();
+
+
+ long count = approximation.getCount( scope, id, shardId, type, type2 );
+
+ assertEquals( 0, count );
+
+ for ( int i = 0; i < increments; i++ ) {
+ approximation.increment( scope, id, shardId, 1, type, type2 );
+ }
+
+ return approximation.getCount( scope, id, shardId, type, type2 );
+ }
+ } );
+
+ futures.add( future );
+ }
+
+
+ for ( Future<Long> future : futures ) {
+ final long value = future.get().longValue();
+
+ assertEquals( increments, value );
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f1c61867/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardCacheTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardCacheTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardCacheTest.java
index b0acd5a..b56711b 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardCacheTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardCacheTest.java
@@ -250,8 +250,8 @@ public class NodeShardCacheTest {
private GraphFig getFigMock() {
final GraphFig graphFig = mock( GraphFig.class );
- when( graphFig.getShardCacheSize() ).thenReturn( 1000 );
- when( graphFig.getCacheTimeout() ).thenReturn( 30000l );
+ when( graphFig.getShardCacheSize() ).thenReturn( 1000l );
+ when( graphFig.getShardCacheTimeout() ).thenReturn( 30000l );
return graphFig;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f1c61867/stack/corepersistence/model/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/model/pom.xml b/stack/corepersistence/model/pom.xml
index 42d64ec..c671128 100644
--- a/stack/corepersistence/model/pom.xml
+++ b/stack/corepersistence/model/pom.xml
@@ -41,6 +41,14 @@
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <version>${commons.lang.version}</version>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f1c61867/stack/corepersistence/model/src/main/java/org/apache/usergrid/persistence/model/util/UUIDGenerator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/model/src/main/java/org/apache/usergrid/persistence/model/util/UUIDGenerator.java b/stack/corepersistence/model/src/main/java/org/apache/usergrid/persistence/model/util/UUIDGenerator.java
index f2c4fa8..af09915 100644
--- a/stack/corepersistence/model/src/main/java/org/apache/usergrid/persistence/model/util/UUIDGenerator.java
+++ b/stack/corepersistence/model/src/main/java/org/apache/usergrid/persistence/model/util/UUIDGenerator.java
@@ -1,93 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
package org.apache.usergrid.persistence.model.util;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Random;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
import com.fasterxml.uuid.EthernetAddress;
import com.fasterxml.uuid.TimestampSynchronizer;
import com.fasterxml.uuid.UUIDTimer;
import com.fasterxml.uuid.impl.TimeBasedGenerator;
+import static com.fasterxml.uuid.impl.UUIDUtil.BYTE_OFFSET_CLOCK_HI;
+import static com.fasterxml.uuid.impl.UUIDUtil.BYTE_OFFSET_CLOCK_LO;
+import static com.fasterxml.uuid.impl.UUIDUtil.BYTE_OFFSET_CLOCK_MID;
+import static com.fasterxml.uuid.impl.UUIDUtil.BYTE_OFFSET_CLOCK_SEQUENCE;
+
/**
- * TODO replace this with the Astyanax generator libs
- * @author: tnine
- *
+ * TODO replace this with the existing usergridUUID utils, we seem to be getting inconsistent behavior from it
*/
public class UUIDGenerator {
- private static final TimestampSynchronizer synchronize = new TimestampSynchronizer() {
+ private static final int[] MICROS = new int[1000];
+
+
+ static {
+ for ( int x = 0; x < 1000; x++ ) {
+ MICROS[x] = x * 10;
+ }
+ }
+
+
+ private static ReentrantLock tsLock = new ReentrantLock( true );
+
+ public static final UUID MIN_TIME_UUID = UUID.fromString( "00000000-0000-1000-8000-000000000000" );
+
+ public static final UUID MAX_TIME_UUID = UUID.fromString( "ffffffff-ffff-1fff-bfff-ffffffffffff" );
+
+ public static final UUID ZERO_UUID = new UUID( 0, 0 );
+
+ private static long timestampMillisNow = System.currentTimeMillis();
+
+ private static AtomicInteger currentMicrosPoint = new AtomicInteger( 0 );
+ private static AtomicInteger customMicrosPointer = new AtomicInteger( 0 );
- /**
- * Pointer to the last value we returned
- */
- private long last = 0;
/**
- * The number of ticks that can be used in the millisecond. In a time UUID a tick is divided into 1/10000 of
- * a millisecond
- *
+ * Return the "next" UUID in micro second resolution. <b>WARNING</b>: this is designed to return the next unique
+ * timestamped UUID for this JVM. Depending on velocity of the call, this method may block internally to insure that
+ * "now" is kept in sync with the UUIDs being generated by this call.
+ * <p/>
+ * In other words, we will intentionally burn CPU insuring that this method is not executed more than 10k -1 times
+ * per millisecond and guarantee that those microseconds held within are sequential.
+ * <p/>
+ * If we did not do this, you would get <b>timestamp collision</b> even though the UUIDs will technically be
+ * 'unique.'
*/
- private AtomicInteger ticks = new AtomicInteger();
+ public static UUID newTimeUUID() {
+ // get & inc counter, but roll on 1k (because we divide by 10 on retrieval)
+ // if count + currentMicro > 1k, block and roll
+ tsLock.lock();
+ long ts = System.currentTimeMillis();
+ if ( ts > timestampMillisNow ) {
+ timestampMillisNow = ts;
+ currentMicrosPoint.set( 0 );
+ }
+ int pointer = currentMicrosPoint.getAndIncrement();
+ try {
+ if ( pointer > 990 ) {
+ TimeUnit.MILLISECONDS.sleep( 1L );
+ }
+ }
+ catch ( Exception ex ) {
+ ex.printStackTrace();
+ }
+ finally {
+ tsLock.unlock();
+ }
+ return newTimeUUID( ts, MICROS[pointer] );
+ }
- @Override
- protected long initialize() throws IOException {
+ private static final long KCLOCK_OFFSET = 0x01b21dd213814000L;
+ private static final long KCLOCK_MULTIPLIER_L = 10000L;
- last = System.currentTimeMillis();
- return last;
- }
+ private static final Random CLOCK_SEQ_RANDOM = new Random();
- @Override
- protected void deactivate() throws IOException {
- //no op
+ // 14 bits of randomness
+ private static int getRandomClockSequence() {
+ return CLOCK_SEQ_RANDOM.nextInt() & 0x3FFF;
}
- @Override
- protected long update( long now ) throws IOException {
- /**
- * Our timestamp is greater just use that and reset last
- */
- if ( now > last ) {
- last = now;
- ticks.set( 0 );
- return last;
- }
+ private static void setTimestamp( long timestamp, byte[] uuidBytes, int clockSeq, int timeOffset ) {
+
+ timestamp *= KCLOCK_MULTIPLIER_L;
+ timestamp += KCLOCK_OFFSET;
+ timestamp += timeOffset;
- //we have the same value (since now should always be increasing) increment a tick
- return last + ticks.incrementAndGet();
+ // Set random clock sequence
+ uuidBytes[BYTE_OFFSET_CLOCK_SEQUENCE] = ( byte ) ( clockSeq >> 8 );
+ uuidBytes[BYTE_OFFSET_CLOCK_SEQUENCE + 1] = ( byte ) clockSeq;
+
+ // Set variant
+ uuidBytes[BYTE_OFFSET_CLOCK_SEQUENCE] &= 0x3F;
+ uuidBytes[BYTE_OFFSET_CLOCK_SEQUENCE] |= 0x80;
+ setTime( uuidBytes, timestamp );
}
- };
- private static final Random random = new Random();
- private static final UUIDTimer timer;
+ @SuppressWarnings("all")
+ private static void setTime( byte[] uuidBytes, long timestamp ) {
+
+ // Time fields aren't nicely split across the UUID, so can't just
+ // linearly dump the stamp:
+ int clockHi = ( int ) ( timestamp >>> 32 );
+ int clockLo = ( int ) timestamp;
+
+ uuidBytes[BYTE_OFFSET_CLOCK_HI] = ( byte ) ( clockHi >>> 24 );
+ uuidBytes[BYTE_OFFSET_CLOCK_HI + 1] = ( byte ) ( clockHi >>> 16 );
+ uuidBytes[BYTE_OFFSET_CLOCK_MID] = ( byte ) ( clockHi >>> 8 );
+ uuidBytes[BYTE_OFFSET_CLOCK_MID + 1] = ( byte ) clockHi;
+ uuidBytes[BYTE_OFFSET_CLOCK_LO] = ( byte ) ( clockLo >>> 24 );
+ uuidBytes[BYTE_OFFSET_CLOCK_LO + 1] = ( byte ) ( clockLo >>> 16 );
+ uuidBytes[BYTE_OFFSET_CLOCK_LO + 2] = ( byte ) ( clockLo >>> 8 );
+ uuidBytes[BYTE_OFFSET_CLOCK_LO + 3] = ( byte ) clockLo;
- /**
- * Lame, but required
- */
- static {
- try {
- timer = new UUIDTimer( random, synchronize );
+ // Set version
+ uuidBytes[BYTE_OFFSET_CLOCK_HI] &= 0x0F;
+ uuidBytes[BYTE_OFFSET_CLOCK_HI] |= 0x10;
}
- catch ( IOException e ) {
- throw new RuntimeException( "Couldn't intialize timer", e );
+
+
+ /**
+ * Generate a timeuuid with the given timestamp in milliseconds and the time offset. Useful when you need to
+ * generate sequential UUIDs for the same period in time. I.E
+ * <p/>
+ * newTimeUUID(1000, 0) <br/> newTimeUUID(1000, 1) <br /> newTimeUUID(1000, 2) <br />
+ * <p/>
+ * etc.
+ * <p/>
+ * Only use this method if you are absolutely sure you need it. When it doubt use the method without the timestamp
+ * offset
+ *
+ * @param ts The timestamp in milliseconds
+ * @param timeoffset The offset, which should always be <= 10000. If you go beyond this range, the millisecond will
+ * be incremented since this is beyond the possible values when converting from millis to 1/10 microseconds stored
+ * in the time uuid.
+ */
+ public static UUID newTimeUUID( long ts, int timeoffset ) {
+ if ( ts == 0 ) {
+ return newTimeUUID();
+ }
+
+ byte[] uuidBytes = new byte[16];
+ // 47 bits of randomness
+ EthernetAddress eth = EthernetAddress.constructMulticastAddress();
+ eth.toByteArray( uuidBytes, 10 );
+ setTimestamp( ts, uuidBytes, getRandomClockSequence(), timeoffset );
+
+ return UUID.nameUUIDFromBytes( uuidBytes );
}
- }
- private static final TimeBasedGenerator generator = new TimeBasedGenerator( EthernetAddress.fromInterface(), timer );
+ /**
+ * Generate a new UUID with the given time stamp in milliseconds. This method guarantees that subsequent calls will
+ * be of increasing value chronologically. If a large number of subsequent calls are made to this method (>1000)
+ * with the same timestamp, you will have non-unique temporal values stored in your UUID.
+ */
+ public static UUID newTimeUUID( long ts ) {
+ tsLock.lock();
+ int pointer = customMicrosPointer.getAndIncrement();
+ try {
+ if ( pointer > 990 ) {
+ customMicrosPointer.set( 0 );
+ }
+ }
+ finally {
+ tsLock.unlock();
+ }
+ return newTimeUUID( ts, MICROS[pointer] );
+ }
- /** Create a new time uuid */
- public static UUID newTimeUUID() {
- return generator.generate();
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f1c61867/stack/corepersistence/model/src/main/java/org/apache/usergrid/persistence/model/util/UUIDUtils.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/model/src/main/java/org/apache/usergrid/persistence/model/util/UUIDUtils.java b/stack/corepersistence/model/src/main/java/org/apache/usergrid/persistence/model/util/UUIDUtils.java
new file mode 100644
index 0000000..1d76e9c
--- /dev/null
+++ b/stack/corepersistence/model/src/main/java/org/apache/usergrid/persistence/model/util/UUIDUtils.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.model.util;
+
+
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+
+import com.fasterxml.uuid.EthernetAddress;
+
+import static com.fasterxml.uuid.impl.UUIDUtil.BYTE_OFFSET_CLOCK_HI;
+import static com.fasterxml.uuid.impl.UUIDUtil.BYTE_OFFSET_CLOCK_LO;
+import static com.fasterxml.uuid.impl.UUIDUtil.BYTE_OFFSET_CLOCK_MID;
+import static com.fasterxml.uuid.impl.UUIDUtil.BYTE_OFFSET_CLOCK_SEQUENCE;
+
+
+public class UUIDUtils {
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f1c61867/stack/corepersistence/model/src/test/java/org/apache/usergrid/persistence/model/util/UUIDGeneratorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/model/src/test/java/org/apache/usergrid/persistence/model/util/UUIDGeneratorTest.java b/stack/corepersistence/model/src/test/java/org/apache/usergrid/persistence/model/util/UUIDGeneratorTest.java
index ef1a9e9..738b788 100644
--- a/stack/corepersistence/model/src/test/java/org/apache/usergrid/persistence/model/util/UUIDGeneratorTest.java
+++ b/stack/corepersistence/model/src/test/java/org/apache/usergrid/persistence/model/util/UUIDGeneratorTest.java
@@ -116,4 +116,21 @@ public class UUIDGeneratorTest {
return null;
}
}
+
+ @Test
+ public void testUUIDOrderingWithTimestamp(){
+ long first = 10000l;
+ long second = first+1;
+
+ //ensure the values are reproducible
+ UUID firstUUID = UUIDGenerator.newTimeUUID( first );
+ UUID firstUUIDx2 = UUIDGenerator.newTimeUUID( first );
+ UUID secondUUID = UUIDGenerator.newTimeUUID( second );
+
+ assertTrue(UUIDComparator.staticCompare( firstUUID, secondUUID ) < 0);
+ assertTrue(UUIDComparator.staticCompare( secondUUID, firstUUID ) > 0);
+
+ assertTrue(UUIDComparator.staticCompare( firstUUIDx2, firstUUID ) == 0);
+
+ }
}