You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2014/08/12 19:00:45 UTC
[4/9] git commit: WIP, overwrite
WIP, overwrite
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/f6567693
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/f6567693
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/f6567693
Branch: refs/heads/USERGRID-188
Commit: f6567693a2ad77f6a8991f54b3aa8414c63c338e
Parents: e9fa368
Author: Todd Nine <to...@apache.org>
Authored: Wed Aug 6 09:41:50 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Wed Aug 6 09:41:50 2014 -0600
----------------------------------------------------------------------
.../impl/shard/ShardEntryGroup.java | 23 ++++
.../impl/shard/impl/NodeShardCacheImpl.java | 124 +++++++++++++++----
.../impl/shard/NodeShardAllocationTest.java | 11 +-
3 files changed, 126 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f6567693/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
index bea428b..37eab83 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
@@ -109,6 +109,20 @@ public class ShardEntryGroup {
/**
+ * Return the minum shard based on time indexes
+ * @return
+ */
+ public Shard getMinShard() {
+ final int size = shards.size();
+
+ if(size < 1){
+ return null;
+ }
+
+ return shards.get(size-1);
+ }
+
+ /**
* Get the entries that we should read from.
*/
public Collection<Shard> getReadShards() {
@@ -135,6 +149,15 @@ public class ShardEntryGroup {
/**
+ * Return true if we have a pending compaction
+ * @return
+ */
+ public boolean isCompactionPending(){
+ return !isTooSmallToCompact();
+ }
+
+
+ /**
* Get the shard all compactions should write to. Null indicates we cannot find a shard that could
* be used as a compaction target. Note that this shard may not have surpassed the delta yet
* You should invoke "shouldCompact" first to ensure all criteria are met before initiating compaction
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f6567693/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
index 6ac83cf..0b51902 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
@@ -29,6 +29,7 @@ import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import org.apache.usergrid.persistence.core.consistency.TimeService;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.GraphFig;
import org.apache.usergrid.persistence.graph.exception.GraphRuntimeException;
@@ -45,6 +46,8 @@ import com.google.common.base.Preconditions;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
import com.google.inject.Inject;
@@ -56,22 +59,27 @@ public class NodeShardCacheImpl implements NodeShardCache {
private final NodeShardAllocation nodeShardAllocation;
private final GraphFig graphFig;
+ private final TimeService timeservice;
private LoadingCache<CacheKey, CacheEntry> graphs;
/**
- *
- * @param nodeShardAllocation
+ * @param nodeShardAllocation
* @param graphFig
+ * @param timeservice
*/
@Inject
- public NodeShardCacheImpl( final NodeShardAllocation nodeShardAllocation, final GraphFig graphFig ) {
+ public NodeShardCacheImpl( final NodeShardAllocation nodeShardAllocation, final GraphFig graphFig,
+ final TimeService timeservice ) {
+
Preconditions.checkNotNull( nodeShardAllocation, "nodeShardAllocation is required" );
Preconditions.checkNotNull( graphFig, "consistencyFig is required" );
+ Preconditions.checkNotNull( timeservice, "timeservice is required" );
this.nodeShardAllocation = nodeShardAllocation;
this.graphFig = graphFig;
+ this.timeservice = timeservice;
/**
* Add our listener to reconstruct the shard
@@ -83,6 +91,7 @@ public class NodeShardCacheImpl implements NodeShardCache {
if ( propertyName.equals( GraphFig.SHARD_CACHE_SIZE ) || propertyName
.equals( GraphFig.SHARD_CACHE_TIMEOUT ) ) {
+
updateCache();
}
}
@@ -150,31 +159,18 @@ public class NodeShardCacheImpl implements NodeShardCache {
*/
private void updateCache() {
- this.graphs = CacheBuilder.newBuilder().maximumSize( graphFig.getShardCacheSize() )
- .expireAfterWrite( graphFig.getShardCacheSize(), TimeUnit.MILLISECONDS )
- .build( new CacheLoader<CacheKey, CacheEntry>() {
-
-
- @Override
- public CacheEntry load( final CacheKey key ) throws Exception {
-
-
-// /**
-// * Perform an audit in case we need to allocate a new shard
-// */
-// nodeShardAllocation.auditMaxShard( key.scope,
-// // key.id, key.types );
-// // //TODO, we need to put some sort of upper
-// // bounds on this, it could possibly get too large
+ /**
+ * TODO: Validate if we swamp this during a config change it garbage collects properly
+ */
+ if(this.graphs != null){
+ this.graphs.invalidateAll();
+ }
- final Iterator<ShardEntryGroup> edges = nodeShardAllocation
- .getShards( key.scope, key.id, key.nodeType, Optional.<Shard>absent(),
- key.types );
- return new CacheEntry( edges );
- }
- } );
+ this.graphs = CacheBuilder.newBuilder().maximumSize( graphFig.getShardCacheSize() )
+ .expireAfterWrite( graphFig.getShardCacheSize(), TimeUnit.MILLISECONDS ).removalListener( new ShardRemovalListener() )
+ .build( new ShardCacheLoader() );
}
@@ -248,8 +244,12 @@ public class NodeShardCacheImpl implements NodeShardCache {
private CacheEntry( final Iterator<ShardEntryGroup> shards ) {
this.shards = new TreeMap<>(ShardEntriesComparator.INSTANCE);
+ /**
+ * TODO, we need to bound this. While I don't evision more than a thousand groups max,
+ * we don't want 1 hog all our ram
+ */
for ( ShardEntryGroup shard : IterableUtil.wrap( shards ) ) {
- this.shards.put(shard.getCompactionTarget().getShardIndex() , shard );
+ this.shards.put(shard.getMinShard().getShardIndex() , shard );
}
}
@@ -301,5 +301,77 @@ public class NodeShardCacheImpl implements NodeShardCache {
return Long.compare( o1, o2 ) * -1;
}
}
+
+
+ }
+
+ private final class ShardCacheLoader extends CacheLoader<CacheKey, CacheEntry> {
+
+
+ @Override
+ public CacheEntry load( final CacheKey key ) throws Exception {
+
+
+ // /**
+ // * Perform an audit in case we need to allocate a new shard
+ // */
+ // nodeShardAllocation.auditMaxShard( key.scope,
+ // // key.id, key.types );
+ // // //TODO, we need to put some sort of upper
+ // // bounds on this, it could possibly get too large
+
+
+ final Iterator<ShardEntryGroup> edges = nodeShardAllocation
+ .getShards( key.scope, key.id, key.nodeType, Optional.<Shard>absent(),
+ key.types );
+
+ return new CacheEntry( edges );
+ }
+ }
+
+ private final class ShardRemovalListener implements RemovalListener<CacheKey, CacheEntry>{
+
+ @Override
+ public void onRemoval( final RemovalNotification<CacheKey, CacheEntry> notification ) {
+
+
+
+ CacheKey key = notification.getKey();
+ CacheEntry entry = notification.getValue();
+
+
+ Iterator<ShardEntryGroup> groups = entry.getShards( Long.MAX_VALUE );
+
+
+ /**
+ * Start at our max, then
+ */
+
+ //audit all our groups
+ while(groups.hasNext()){
+ ShardEntryGroup group = groups.next();
+
+ /**
+ * We have a compaction that may need to be run, don't allocate anything
+ */
+ if(!group.isCompactionPending()){
+ /**
+ * Check if we should allocate, we may want to
+ */
+ nodeShardAllocation.auditMaxShard(key.scope, key.id, key.nodeType, key.types );
+ continue;
+ }
+
+ /**
+ * Do the compaction
+ */
+ if(group.shouldCompact( timeservice.getCurrentTime() )){
+
+ }
+
+ }
+
+
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f6567693/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
index a31a8ad..32cbc40 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
@@ -536,10 +536,9 @@ public class NodeShardAllocationTest {
ShardEntryGroup shardEntryGroup = result.next();
- final Shard expected = new Shard( 0, 0, false );
-
- assertEquals( "Future shard returned", expected, shardEntryGroup.getCompactionTarget() );
+ final Shard rootShard = new Shard( 0, 0, true );
+ assertEquals("Shard size expected", 1, shardEntryGroup.entrySize());
//now verify all 4 are in this group. This is because the first shard (0,0) (n-1_ may be the only shard other
//nodes see while we're rolling our state. This means it should be read and merged from as well
@@ -549,12 +548,12 @@ public class NodeShardAllocationTest {
Collection<Shard> readShards = shardEntryGroup.getReadShards( );
- assertTrue( "0 shard allocated", writeShards.contains( expected ) );
+ assertTrue( "root shard allocated", writeShards.contains( rootShard ) );
- assertTrue( "0 shard allocated", readShards.contains( expected ) );
+ assertTrue( "root shard allocated", readShards.contains( rootShard ) );
- assertFalse( "No shard allocated", result.hasNext() );
+ assertFalse( "No other shard group allocated", result.hasNext() );
}