You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2014/08/12 19:00:45 UTC

[4/9] git commit: WIP, overwrite

WIP, overwrite


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/f6567693
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/f6567693
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/f6567693

Branch: refs/heads/USERGRID-188
Commit: f6567693a2ad77f6a8991f54b3aa8414c63c338e
Parents: e9fa368
Author: Todd Nine <to...@apache.org>
Authored: Wed Aug 6 09:41:50 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Wed Aug 6 09:41:50 2014 -0600

----------------------------------------------------------------------
 .../impl/shard/ShardEntryGroup.java             |  23 ++++
 .../impl/shard/impl/NodeShardCacheImpl.java     | 124 +++++++++++++++----
 .../impl/shard/NodeShardAllocationTest.java     |  11 +-
 3 files changed, 126 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f6567693/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
index bea428b..37eab83 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
@@ -109,6 +109,20 @@ public class ShardEntryGroup {
 
 
     /**
+     * Return the minum shard based on time indexes
+     * @return
+     */
+    public Shard getMinShard() {
+        final int size = shards.size();
+
+        if(size < 1){
+            return null;
+        }
+
+        return shards.get(size-1);
+    }
+
+    /**
      * Get the entries that we should read from.
      */
     public Collection<Shard> getReadShards() {
@@ -135,6 +149,15 @@ public class ShardEntryGroup {
 
 
     /**
+     * Return true if we have a pending compaction
+     * @return
+     */
+    public boolean isCompactionPending(){
+        return !isTooSmallToCompact();
+    }
+
+
+    /**
      * Get the shard all compactions should write to.  Null indicates we cannot find a shard that could
      * be used as a compaction target.  Note that this shard may not have surpassed the delta yet
      * You should invoke "shouldCompact" first to ensure all criteria are met before initiating compaction

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f6567693/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
index 6ac83cf..0b51902 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
@@ -29,6 +29,7 @@ import java.util.TreeMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.usergrid.persistence.core.consistency.TimeService;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.GraphFig;
 import org.apache.usergrid.persistence.graph.exception.GraphRuntimeException;
@@ -45,6 +46,8 @@ import com.google.common.base.Preconditions;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
 import com.google.inject.Inject;
 
 
@@ -56,22 +59,27 @@ public class NodeShardCacheImpl implements NodeShardCache {
 
     private final NodeShardAllocation nodeShardAllocation;
     private final GraphFig graphFig;
+    private final TimeService timeservice;
 
     private LoadingCache<CacheKey, CacheEntry> graphs;
 
 
     /**
-     *
-     * @param nodeShardAllocation
+     *  @param nodeShardAllocation
      * @param graphFig
+     * @param timeservice
      */
     @Inject
-    public NodeShardCacheImpl( final NodeShardAllocation nodeShardAllocation, final GraphFig graphFig ) {
+    public NodeShardCacheImpl( final NodeShardAllocation nodeShardAllocation, final GraphFig graphFig,
+                               final TimeService timeservice ) {
+
         Preconditions.checkNotNull( nodeShardAllocation, "nodeShardAllocation is required" );
         Preconditions.checkNotNull( graphFig, "consistencyFig is required" );
+        Preconditions.checkNotNull( timeservice, "timeservice is required" );
 
         this.nodeShardAllocation = nodeShardAllocation;
         this.graphFig = graphFig;
+        this.timeservice = timeservice;
 
         /**
          * Add our listener to reconstruct the shard
@@ -83,6 +91,7 @@ public class NodeShardCacheImpl implements NodeShardCache {
 
                 if ( propertyName.equals( GraphFig.SHARD_CACHE_SIZE ) || propertyName
                         .equals( GraphFig.SHARD_CACHE_TIMEOUT ) ) {
+
                     updateCache();
                 }
             }
@@ -150,31 +159,18 @@ public class NodeShardCacheImpl implements NodeShardCache {
      */
     private void updateCache() {
 
-        this.graphs = CacheBuilder.newBuilder().maximumSize( graphFig.getShardCacheSize() )
-                                  .expireAfterWrite( graphFig.getShardCacheSize(), TimeUnit.MILLISECONDS )
-                                  .build( new CacheLoader<CacheKey, CacheEntry>() {
-
-
-                                      @Override
-                                      public CacheEntry load( final CacheKey key ) throws Exception {
-
-
-//                                                                    /**
-//                                                                     * Perform an audit in case we need to allocate a new shard
-//                                                                     */
-//                                                                    nodeShardAllocation.auditMaxShard( key.scope,
-//                                          // key.id, key.types );
-//                                          //                          //TODO, we need to put some sort of upper
-//                                          // bounds on this, it could possibly get too large
+        /**
+         * TODO: Validate if we swamp this during a config change it garbage collects properly
+         */
 
+        if(this.graphs != null){
+            this.graphs.invalidateAll();
+        }
 
-                                          final Iterator<ShardEntryGroup> edges = nodeShardAllocation
-                                                  .getShards( key.scope, key.id, key.nodeType, Optional.<Shard>absent(),
-                                                          key.types );
 
-                                          return new CacheEntry( edges );
-                                      }
-                                  } );
+        this.graphs = CacheBuilder.newBuilder().maximumSize( graphFig.getShardCacheSize() )
+                                  .expireAfterWrite( graphFig.getShardCacheSize(), TimeUnit.MILLISECONDS ).removalListener( new ShardRemovalListener() )
+                                  .build( new ShardCacheLoader() );
     }
 
 
@@ -248,8 +244,12 @@ public class NodeShardCacheImpl implements NodeShardCache {
         private CacheEntry( final Iterator<ShardEntryGroup> shards ) {
             this.shards = new TreeMap<>(ShardEntriesComparator.INSTANCE);
 
+            /**
+             * TODO, we need to bound this.  While I don't evision more than a thousand groups max,
+             * we don't want 1 hog all our ram
+             */
             for ( ShardEntryGroup shard : IterableUtil.wrap( shards ) ) {
-                this.shards.put(shard.getCompactionTarget().getShardIndex() , shard );
+                this.shards.put(shard.getMinShard().getShardIndex() , shard );
             }
         }
 
@@ -301,5 +301,77 @@ public class NodeShardCacheImpl implements NodeShardCache {
                 return Long.compare( o1, o2 ) * -1;
             }
         }
+
+
+    }
+
+    private final class ShardCacheLoader extends CacheLoader<CacheKey, CacheEntry> {
+
+
+        @Override
+        public CacheEntry load( final CacheKey key ) throws Exception {
+
+
+            //                                                                    /**
+            //                                                                     * Perform an audit in case we need to allocate a new shard
+            //                                                                     */
+            //                                                                    nodeShardAllocation.auditMaxShard( key.scope,
+            //                                          // key.id, key.types );
+            //                                          //                          //TODO, we need to put some sort of upper
+            //                                          // bounds on this, it could possibly get too large
+
+
+            final Iterator<ShardEntryGroup> edges = nodeShardAllocation
+                    .getShards( key.scope, key.id, key.nodeType, Optional.<Shard>absent(),
+                            key.types );
+
+            return new CacheEntry( edges );
+        }
+    }
+
+    private final class ShardRemovalListener implements RemovalListener<CacheKey, CacheEntry>{
+
+        @Override
+        public void onRemoval( final RemovalNotification<CacheKey, CacheEntry> notification ) {
+
+
+
+            CacheKey key = notification.getKey();
+            CacheEntry entry = notification.getValue();
+
+
+            Iterator<ShardEntryGroup> groups = entry.getShards( Long.MAX_VALUE );
+
+
+            /**
+             * Start at our max, then
+             */
+
+            //audit all our groups
+            while(groups.hasNext()){
+                ShardEntryGroup group = groups.next();
+
+                /**
+                 * We have a compaction that may need to be run, don't allocate anything
+                 */
+                if(!group.isCompactionPending()){
+                    /**
+                     * Check if we should allocate, we may want to
+                     */
+                    nodeShardAllocation.auditMaxShard(key.scope, key.id, key.nodeType, key.types  );
+                    continue;
+                }
+
+                /**
+                 * Do the compaction
+                 */
+                if(group.shouldCompact( timeservice.getCurrentTime() )){
+
+                }
+
+            }
+
+
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f6567693/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
index a31a8ad..32cbc40 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
@@ -536,10 +536,9 @@ public class NodeShardAllocationTest {
 
         ShardEntryGroup shardEntryGroup = result.next();
 
-        final Shard expected = new Shard( 0, 0, false );
-
-        assertEquals( "Future shard returned", expected, shardEntryGroup.getCompactionTarget() );
+        final Shard rootShard = new Shard( 0, 0, true );
 
+        assertEquals("Shard size expected", 1, shardEntryGroup.entrySize());
 
         //now verify all 4 are in this group.  This is because the first shard (0,0) (n-1_ may be the only shard other
         //nodes see while we're rolling our state.  This means it should be read and merged from as well
@@ -549,12 +548,12 @@ public class NodeShardAllocationTest {
         Collection<Shard> readShards = shardEntryGroup.getReadShards( );
 
 
-        assertTrue( "0 shard allocated", writeShards.contains( expected ) );
+        assertTrue( "root shard allocated", writeShards.contains( rootShard ) );
 
-        assertTrue( "0 shard allocated", readShards.contains( expected ) );
+        assertTrue( "root shard allocated", readShards.contains( rootShard ) );
 
 
-        assertFalse( "No shard allocated", result.hasNext() );
+        assertFalse( "No other shard group allocated", result.hasNext() );
     }