You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/04/12 00:21:31 UTC

[5/7] git commit: Updated shard and counter interfaces

Updated shard and counter interfaces


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/45c3ba5b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/45c3ba5b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/45c3ba5b

Branch: refs/heads/asyncqueue
Commit: 45c3ba5b57707752c9907f614deff36adb80ae82
Parents: c178fc6
Author: Todd Nine <tn...@apigee.com>
Authored: Fri Apr 4 17:00:11 2014 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri Apr 4 17:00:11 2014 -0700

----------------------------------------------------------------------
 stack/corepersistence/graph/pom.xml             |   9 +
 .../usergrid/persistence/graph/GraphFig.java    |  10 +-
 .../graph/impl/cache/NodeShardAllocation.java   |  71 -----
 .../impl/cache/NodeShardAllocationImpl.java     | 145 ----------
 .../graph/impl/cache/NodeShardCache.java        |  43 ---
 .../graph/impl/cache/NodeShardCacheImpl.java    | 264 -------------------
 .../graph/impl/shard/NodeShardAllocation.java   |  73 +++++
 .../impl/shard/NodeShardAllocationImpl.java     | 149 +++++++++++
 .../graph/impl/shard/NodeShardCache.java        |  43 +++
 .../graph/impl/shard/NodeShardCacheImpl.java    | 237 +++++++++++++++++
 .../impl/shard/TimeShardApproximation.java      |  51 ++++
 .../impl/shard/TimeShardApproximationImpl.java  | 118 +++++++++
 .../serialization/EdgeSeriesSerialization.java  |   2 +-
 .../impl/NodeSerializationImpl.java             |   4 +-
 .../graph/serialization/util/IterableUtil.java  |  43 +++
 .../graph/impl/cache/NodeShardCacheTest.java    | 257 ------------------
 .../graph/impl/shard/NodeShardCacheTest.java    | 258 ++++++++++++++++++
 17 files changed, 992 insertions(+), 785 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45c3ba5b/stack/corepersistence/graph/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/pom.xml b/stack/corepersistence/graph/pom.xml
index 1abec48..64587a9 100644
--- a/stack/corepersistence/graph/pom.xml
+++ b/stack/corepersistence/graph/pom.xml
@@ -39,6 +39,14 @@
       <version>${project.version}</version>
     </dependency>
 
+
+    <!-- utilized for hyperloglog cardinality terms -->
+    <dependency>
+        <groupId>com.clearspring.analytics</groupId>
+        <artifactId>stream</artifactId>
+        <version>2.6.0</version>
+    </dependency>
+
     <dependency>
       <groupId>org.apache.usergrid</groupId>
       <artifactId>collection</artifactId>
@@ -54,6 +62,7 @@
       <scope>test</scope>
     </dependency>
 
+
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45c3ba5b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
index 43c3b9b..03f0a6e 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
@@ -49,9 +49,11 @@ public interface GraphFig extends GuicyFig {
 
     public static final String READ_TIMEOUT = "usergrid.graph.read.timeout";
 
-    public static final String CACHE_TIMEOUT = "usergrid.graph.cache.timeout";
+    public static final String CACHE_TIMEOUT = "usergrid.graph.shard.timeout";
 
-    public static final String CACHE_SIZE = "usergrid.graph.cache.size";
+    public static final String CACHE_SIZE = "usergrid.graph.shard.size";
+
+    public static final String COUNTER_PRECISION_LOSS = "usergrid.graph.counter.precision.loss";
 
     @Default( "1000" )
     @Key(SCAN_PAGE_SIZE)
@@ -100,5 +102,9 @@ public interface GraphFig extends GuicyFig {
     @Default( "30000" )
     @Key( CACHE_TIMEOUT )
     long getCacheTimeout();
+
+    @Default(".02")
+    @Key(COUNTER_PRECISION_LOSS)
+    double getCounterPrecisionLoss();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45c3ba5b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/cache/NodeShardAllocation.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/cache/NodeShardAllocation.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/cache/NodeShardAllocation.java
deleted file mode 100644
index ad4b330..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/cache/NodeShardAllocation.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.graph.impl.cache;
-
-
-import java.util.List;
-import java.util.UUID;
-
-import org.apache.usergrid.persistence.collection.OrganizationScope;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-
-/**
- * Interface used to create and retrieve shards
- */
-public interface NodeShardAllocation {
-
-
-
-    /**
-     * Get all shards for the given info.  If none exist, a default shard should be allocated
-     * @param scope
-     * @param nodeId
-     * @param maxShardId The max value to start seeking from.  Values <= this will be returned
-     * @param count The count of elements to return
-     * @param edgeTypes
-     * @return A list of all shards <= the current shard.  This will always return MIN_UUID if no shards are allocated
-     */
-    public List<UUID> getShards(final OrganizationScope scope, final Id nodeId, UUID maxShardId, int count,  final String... edgeTypes);
-
-
-    /**
-     * Audit our highest shard for it's maximum capacity.  If it has reached the max capacity <=, it will allocate a new shard
-     *
-     * @param scope The organization scope
-     * @param nodeId The node id
-     * @param edgeType The edge types
-     * @return True if a new shard was allocated
-     */
-    public boolean auditMaxShard(final OrganizationScope scope, final Id nodeId, final String... edgeType);
-
-
-    /**
-     * Increment the shard Id the specified amount
-     * @param scope The scope
-     * @param nodeId The node id
-     * @param shardId The shard id
-     * @param count The count
-     * @param edgeType The edge type
-     */
-    public void increment(final OrganizationScope scope, final Id nodeId, final UUID shardId, int count, final String... edgeType);
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45c3ba5b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/cache/NodeShardAllocationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/cache/NodeShardAllocationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/cache/NodeShardAllocationImpl.java
deleted file mode 100644
index d03ccaa..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/cache/NodeShardAllocationImpl.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.graph.impl.cache;
-
-
-import java.util.List;
-import java.util.UUID;
-
-import org.apache.usergrid.persistence.collection.OrganizationScope;
-import org.apache.usergrid.persistence.graph.GraphFig;
-import org.apache.usergrid.persistence.graph.serialization.EdgeSeriesCounterSerialization;
-import org.apache.usergrid.persistence.graph.serialization.EdgeSeriesSerialization;
-import org.apache.usergrid.persistence.model.entity.Id;
-import org.apache.usergrid.persistence.model.util.UUIDGenerator;
-
-import com.fasterxml.uuid.UUIDComparator;
-import com.google.inject.Inject;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-
-import static org.apache.usergrid.persistence.graph.impl.Constants.MAX_UUID;
-
-
-/**
- * Implementation of the node shard monitor and allocation
- */
-public class NodeShardAllocationImpl implements NodeShardAllocation {
-
-
-    private final EdgeSeriesSerialization edgeSeriesSerialization;
-    private final EdgeSeriesCounterSerialization edgeSeriesCounterSerialization;
-    private final GraphFig graphFig;
-
-
-    @Inject
-    public NodeShardAllocationImpl( final EdgeSeriesSerialization edgeSeriesSerialization,
-                                    final EdgeSeriesCounterSerialization edgeSeriesCounterSerialization,
-                                    final GraphFig graphFig ) {
-        this.edgeSeriesSerialization = edgeSeriesSerialization;
-        this.edgeSeriesCounterSerialization = edgeSeriesCounterSerialization;
-        this.graphFig = graphFig;
-    }
-
-
-    @Override
-    public List<UUID> getShards( final OrganizationScope scope, final Id nodeId, final UUID maxShardId, final int count,
-                                 final String... edgeTypes ) {
-        return edgeSeriesSerialization.getEdgeMetaData( scope, nodeId, maxShardId, count, edgeTypes );
-    }
-
-
-    @Override
-    public boolean auditMaxShard( final OrganizationScope scope, final Id nodeId, final String... edgeType ) {
-
-        final UUID now = UUIDGenerator.newTimeUUID();
-
-        List<UUID> maxShards = getShards( scope, nodeId, MAX_UUID, 1, edgeType );
-
-        //if the first shard has already been allocated, do nothing.
-
-        //now is already > than the max, don't do anything
-        if ( maxShards.size() > 0 && UUIDComparator.staticCompare( now, maxShards.get( 0 ) ) < 0 ) {
-            return false;
-        }
-
-        //allocate a new shard
-        //TODO T.N. modify the time uuid utils to allow future generation, this is incorrect, but a place holder
-        final UUID futureUUID = UUIDGenerator.newTimeUUID();
-
-        try {
-            this.edgeSeriesSerialization.writeEdgeMeta( scope, nodeId, futureUUID, edgeType ).execute();
-        }
-        catch ( ConnectionException e ) {
-            throw new RuntimeException( "Unable to write the new edge metadata" );
-        }
-
-        UUID max = null;
-
-
-        MutationBatch rollup = null;
-        boolean completed = false;
-
-        //TODO, change this into an iterator
-        while ( !completed ) {
-
-            List<UUID> shards = getShards( scope, nodeId, MAX_UUID, 100, edgeType );
-
-            for ( UUID shardId : shards ) {
-                if ( UUIDComparator.staticCompare( shardId, max ) >= 0 ) {
-                    completed = true;
-                    break;
-                }
-
-
-                final MutationBatch batch = edgeSeriesSerialization.removeEdgeMeta( scope, nodeId, shardId, edgeType );
-
-                if ( rollup == null ) {
-                    rollup = batch;
-                }
-                else {
-                    rollup.mergeShallow( batch );
-                }
-            }
-
-
-            //while our max value is > than the value we just created, delete it
-        }
-
-        if(rollup != null){
-            try {
-                rollup.execute();
-            }
-            catch ( ConnectionException e ) {
-                throw new RuntimeException( "Unable to cleanup allocated shards" );
-            }
-        }
-
-        return true;
-    }
-
-
-    @Override
-    public void increment( final OrganizationScope scope, final Id nodeId, final UUID shardId, final int count,
-                           final String... edgeType ) {
-        //delegate
-        edgeSeriesCounterSerialization.incrementMetadataCount( scope, nodeId, shardId, count, edgeType );
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45c3ba5b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/cache/NodeShardCache.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/cache/NodeShardCache.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/cache/NodeShardCache.java
deleted file mode 100644
index 251ee30..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/cache/NodeShardCache.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.persistence.graph.impl.cache;
-
-
-import java.util.UUID;
-
-import org.apache.usergrid.persistence.collection.OrganizationScope;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-
-/**
- *  Cache implementation for returning versions based on the slice.  This cache may be latent.  As a result
- *  the allocation of new shards should be 2*cache timeout in the future.
- *
- */
-public interface NodeShardCache {
-
-
-    /**
-     * Get the time meta data for the given node
-     * @param nodeId
-     * @param time The time to select the slice for.
-     * @param edgeType
-     */
-    public UUID getSlice(final OrganizationScope scope, final Id nodeId, final UUID time, final String... edgeType);
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45c3ba5b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/cache/NodeShardCacheImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/cache/NodeShardCacheImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/cache/NodeShardCacheImpl.java
deleted file mode 100644
index 943bf5f..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/cache/NodeShardCacheImpl.java
+++ /dev/null
@@ -1,264 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.persistence.graph.impl.cache;
-
-import static org.apache.usergrid.persistence.graph.impl.Constants.*;
-
-import java.beans.PropertyChangeEvent;
-import java.beans.PropertyChangeListener;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.usergrid.persistence.collection.OrganizationScope;
-import org.apache.usergrid.persistence.graph.GraphFig;
-import org.apache.usergrid.persistence.graph.serialization.EdgeSeriesSerialization;
-import org.apache.usergrid.persistence.model.entity.Id;
-import org.apache.usergrid.persistence.model.util.UUIDGenerator;
-
-import com.fasterxml.uuid.UUIDComparator;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.inject.Inject;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-
-
-/**
- * Simple implementation of the cache.  Uses a local Guava cache with a timeout.  If a value is not present in the
- * cache, it will need to be searched via cassandra.
- */
-public class NodeShardCacheImpl implements NodeShardCache {
-
-
-
-    private static final int MAX_SHARD_COUNT = 10000;
-
-
-
-    private final NodeShardAllocation nodeShardAllocation;
-    private final GraphFig graphFig;
-
-    private LoadingCache<CacheKey, CacheEntry> graphs;
-
-
-    /**
-     *
-     * @param nodeShardAllocation
-     * @param graphFig
-     */
-    @Inject
-    public NodeShardCacheImpl( final  NodeShardAllocation nodeShardAllocation, final GraphFig graphFig ) {
-        this.nodeShardAllocation = nodeShardAllocation;
-        this.graphFig = graphFig;
-
-        /**
-         * Add our listener to reconstruct the cache
-         */
-        this.graphFig.addPropertyChangeListener( new PropertyChangeListener() {
-            @Override
-            public void propertyChange( final PropertyChangeEvent evt ) {
-                final String propertyName = evt.getPropertyName();
-
-                if ( propertyName.equals( GraphFig.CACHE_SIZE ) || propertyName.equals( GraphFig.CACHE_TIMEOUT ) ) {
-                    updateCache();
-                }
-            }
-        } );
-
-        /**
-         * Initialize the cache
-         */
-        updateCache();
-    }
-
-
-    @Override
-    public UUID getSlice( final OrganizationScope scope, final Id nodeId, final UUID time, final String... edgeType ) {
-
-
-        final CacheKey key = new CacheKey(scope, nodeId, edgeType  );
-        CacheEntry entry;
-
-        try {
-            entry = this.graphs.get( key );
-        }
-        catch ( ExecutionException e ) {
-            throw new RuntimeException( "Unable to load cache key for graph", e );
-        }
-
-        final UUID shardId = entry.getShardId(time);
-
-        if(shardId != null){
-            return shardId;
-        }
-
-        //if we get here, something went wrong, our cache should always have a time UUID to return to us
-        throw new RuntimeException( "No time UUID shard was found and could not allocate one" );
-
-    }
-
-
-
-    /**
-     * This is a race condition.  We could re-init the cache while another thread is reading it.  This is fine, the read
-     * doesn't have to be precise.  The algorithm accounts for stale data.
-     */
-    private void updateCache() {
-
-        this.graphs = CacheBuilder.newBuilder().maximumSize( graphFig.getCacheSize() )
-                                  .expireAfterWrite( graphFig.getCacheTimeout(), TimeUnit.MILLISECONDS )
-                                  .build( new CacheLoader<CacheKey, CacheEntry>() {
-
-
-                                      @Override
-                                      public CacheEntry load( final CacheKey key ) throws Exception {
-                                          //doing this with a static size could result in lost "old" shards, this needs to be an
-                                          //iterating cache that can refresh if we have a full size
-                                          final List<UUID> edges = nodeShardAllocation.getShards( key.scope,
-                                                  key.id, MAX_UUID, MAX_SHARD_COUNT, key.types );
-
-
-                                          /**
-                                           * Perform an async audit in case we need to allocate a new shard
-                                           */
-                                          nodeShardAllocation.auditMaxShard( key.scope, key.id, key.types );
-
-                                          return new CacheEntry( edges );
-                                      }
-                                  } );
-    }
-
-
-
-    /**
-     * Cache key for looking up items in the cache
-     */
-    private static class CacheKey {
-        private final OrganizationScope scope;
-        private final Id id;
-        private final String[] types;
-
-
-        private CacheKey( final OrganizationScope scope, final Id id, final String[] types ) {
-            this.scope = scope;
-            this.id = id;
-            this.types = types;
-        }
-
-
-        @Override
-        public boolean equals( final Object o ) {
-            if ( this == o ) {
-                return true;
-            }
-            if ( o == null || getClass() != o.getClass() ) {
-                return false;
-            }
-
-            final CacheKey cacheKey = ( CacheKey ) o;
-
-            if ( !id.equals( cacheKey.id ) ) {
-                return false;
-            }
-            if ( !Arrays.equals( types, cacheKey.types ) ) {
-                return false;
-            }
-
-            return true;
-        }
-
-
-        @Override
-        public int hashCode() {
-            int result = id.hashCode();
-            result = 31 * result + Arrays.hashCode( types );
-            return result;
-        }
-    }
-
-
-    /**
-     * An entry for the cache.
-     */
-    private static class CacheEntry {
-        /**
-         * Get the list of all segments
-         */
-        private List<UUID> shards;
-
-
-        private CacheEntry( final List<UUID> shards ) {
-            this.shards = shards;
-        }
-
-
-        /**
-         * Get the shard's UUID for the uuid we're attempting to seek from
-         *
-         * @param seek
-         * @return
-         */
-        public UUID getShardId( final UUID seek) {
-            int index = Collections.binarySearch( this.shards, seek, MetaComparator.INSTANCE );
-
-            /**
-             * We have an exact match, return it
-             */
-            if(index > -1){
-               return shards.get( index );
-            }
-
-
-            //update the index to represent the index we should insert to and read it.  This will be <= the UUID we were passed
-            //we subtract 2 to get the index < this one
-            index = (index*-1) -2;
-
-
-            if(index < shards.size()){
-                return shards.get( index );
-            }
-
-
-            return null;
-        }
-
-    }
-
-
-
-
-    /**
-     * UUID Comparator
-     */
-    private static class MetaComparator implements Comparator<UUID> {
-
-        public static final UUIDComparator INSTANCE = new UUIDComparator();
-
-
-        @Override
-        public int compare( final UUID o1, final UUID o2 ) {
-            return com.fasterxml.uuid.UUIDComparator.staticCompare( o1, o2 );
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45c3ba5b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardAllocation.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardAllocation.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardAllocation.java
new file mode 100644
index 0000000..bede2ab
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardAllocation.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.impl.shard;
+
+
+import java.util.Iterator;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+/**
+ * Interface used to create and retrieve shards
+ */
+public interface NodeShardAllocation {
+
+
+
+    /**
+     * Get all shards for the given info.  If none exist, a default shard should be allocated
+     *
+     * @param scope
+     * @param nodeId
+     * @param maxShardId The max value to start seeking from.  Values <= this will be returned
+     * @param pageSize The page size to use in the fetch
+     * @param edgeTypes
+     * @return A list of all shards <= the current shard.  This will always return MIN_UUID if no shards are allocated
+     */
+    public Iterator<UUID> getShards( final OrganizationScope scope, final Id nodeId, UUID maxShardId, int pageSize,
+                                     final String... edgeTypes );
+
+
+    /**
+     * Audit our highest shard for it's maximum capacity.  If it has reached the max capacity <=, it will allocate a new shard
+     *
+     * @param scope The organization scope
+     * @param nodeId The node id
+     * @param edgeType The edge types
+     * @return True if a new shard was allocated
+     */
+    public boolean auditMaxShard(final OrganizationScope scope, final Id nodeId, final String... edgeType);
+
+
+    /**
+     * Increment the shard Id the specified amount
+     * @param scope The scope
+     * @param nodeId The node id
+     * @param shardId The shard id
+     * @param count The count
+     * @param edgeType The edge type
+     */
+    public void increment(final OrganizationScope scope, final Id nodeId, final UUID shardId, int count, final String... edgeType);
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45c3ba5b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardAllocationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardAllocationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardAllocationImpl.java
new file mode 100644
index 0000000..e156db6
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardAllocationImpl.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.impl.shard;
+
+
+import java.util.Iterator;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.graph.GraphFig;
+import org.apache.usergrid.persistence.graph.consistency.TimeService;
+import org.apache.usergrid.persistence.graph.serialization.EdgeSeriesCounterSerialization;
+import org.apache.usergrid.persistence.graph.serialization.EdgeSeriesSerialization;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import com.fasterxml.uuid.UUIDComparator;
+import com.google.inject.Inject;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import static org.apache.usergrid.persistence.graph.impl.Constants.MAX_UUID;
+
+
+/**
+ * Implementation of the node shard monitor and allocation
+ */
+public class NodeShardAllocationImpl implements NodeShardAllocation {
+
+
+    private final EdgeSeriesSerialization edgeSeriesSerialization;
+    private final EdgeSeriesCounterSerialization edgeSeriesCounterSerialization;
+    private final TimeService timeService;
+    private final GraphFig graphFig;
+
+
+    @Inject
+    public NodeShardAllocationImpl( final EdgeSeriesSerialization edgeSeriesSerialization,
+                                    final EdgeSeriesCounterSerialization edgeSeriesCounterSerialization,
+                                    final TimeService timeService, final GraphFig graphFig ) {
+        this.edgeSeriesSerialization = edgeSeriesSerialization;
+        this.edgeSeriesCounterSerialization = edgeSeriesCounterSerialization;
+        this.timeService = timeService;
+        this.graphFig = graphFig;
+    }
+
+
+    @Override
+    public Iterator<UUID> getShards( final OrganizationScope scope, final Id nodeId, final UUID maxShardId,
+                                     final int pageSize, final String... edgeTypes ) {
+        return edgeSeriesSerialization.getEdgeMetaData( scope, nodeId, maxShardId, pageSize, edgeTypes );
+    }
+
+
+    @Override
+    public boolean auditMaxShard( final OrganizationScope scope, final Id nodeId, final String... edgeType ) {
+
+        final UUID now = UUIDGenerator.newTimeUUID();
+
+        Iterator<UUID> maxShards = getShards( scope, nodeId, MAX_UUID, 1, edgeType );
+
+        //if the first shard has already been allocated, do nothing.
+
+        //now is already > than the max, don't do anything
+        if ( maxShards.hasNext() && UUIDComparator.staticCompare( now, maxShards.next() ) < 0 ) {
+            return false;
+        }
+
+        final long newShardTime = timeService.getCurrentTime() + graphFig.getCacheTimeout()*2;
+
+        //allocate a new shard at least now+ 2x our shard timeout.  We want to be sure that all replicas pick up on the new
+        //shard
+
+        final UUID futureUUID = UUIDGenerator.newTimeUUID(newShardTime);
+
+        try {
+            this.edgeSeriesSerialization.writeEdgeMeta( scope, nodeId, futureUUID, edgeType ).execute();
+        }
+        catch ( ConnectionException e ) {
+            throw new RuntimeException( "Unable to write the new edge metadata" );
+        }
+
+        UUID max = null;
+
+
+        MutationBatch rollup = null;
+
+        Iterator<UUID> shards = getShards( scope, nodeId, MAX_UUID, 1000, edgeType );
+
+        while ( shards.hasNext() ) {
+
+            final UUID shardId = shards.next();
+
+            if ( UUIDComparator.staticCompare( shardId, max ) >= 0 ) {
+                break;
+            }
+
+
+            //remove the edge that is too large from the node shard allocation
+            final MutationBatch batch = edgeSeriesSerialization.removeEdgeMeta( scope, nodeId, shardId, edgeType );
+
+            if ( rollup == null ) {
+                rollup = batch;
+            }
+            else {
+                rollup.mergeShallow( batch );
+            }
+
+
+            //while our max value is > than the value we just created, delete it
+        }
+
+        if ( rollup != null ) {
+            try {
+                rollup.execute();
+            }
+            catch ( ConnectionException e ) {
+                throw new RuntimeException( "Unable to cleanup allocated shards" );
+            }
+        }
+
+        return true;
+    }
+
+
+    @Override
+    public void increment( final OrganizationScope scope, final Id nodeId, final UUID shardId, final int count,
+                           final String... edgeType ) {
+        //delegate
+        edgeSeriesCounterSerialization.incrementMetadataCount( scope, nodeId, shardId, count, edgeType );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45c3ba5b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardCache.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardCache.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardCache.java
new file mode 100644
index 0000000..7515eb0
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardCache.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.graph.impl.shard;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+/**
+ *  Cache implementation for returning versions based on the slice.  This shard may be latent.  As a result
+ *  the allocation of new shards should be 2*shard timeout in the future.
+ *
+ */
+public interface NodeShardCache {
+
+
+    /**
+     * Get the time meta data for the given node
+     * @param nodeId
+     * @param time The time to select the slice for.
+     * @param edgeType
+     */
+    public UUID getSlice(final OrganizationScope scope, final Id nodeId, final UUID time, final String... edgeType);
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45c3ba5b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardCacheImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardCacheImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardCacheImpl.java
new file mode 100644
index 0000000..055abbc
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardCacheImpl.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.graph.impl.shard;
+
+
+import java.beans.PropertyChangeEvent;
+import java.beans.PropertyChangeListener;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.graph.GraphFig;
+import org.apache.usergrid.persistence.graph.serialization.util.IterableUtil;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.fasterxml.uuid.UUIDComparator;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.inject.Inject;
+
+import static org.apache.usergrid.persistence.graph.impl.Constants.MAX_UUID;
+
+
+/**
+ * Simple implementation of the shard.  Uses a local Guava shard with a timeout.  If a value is not present in the
+ * shard, it will need to be searched via cassandra.
+ */
+public class NodeShardCacheImpl implements NodeShardCache {
+
+
+    private static final int SHARD_PAGE_SIZE = 1000;
+
+
+    private final NodeShardAllocation nodeShardAllocation;
+    private final GraphFig graphFig;
+
+    private LoadingCache<CacheKey, CacheEntry> graphs;
+
+
+    /**
+     *
+     * @param nodeShardAllocation
+     * @param graphFig
+     */
+    @Inject
+    public NodeShardCacheImpl( final NodeShardAllocation nodeShardAllocation, final GraphFig graphFig ) {
+        this.nodeShardAllocation = nodeShardAllocation;
+        this.graphFig = graphFig;
+
+        /**
+         * Add our listener to reconstruct the shard
+         */
+        this.graphFig.addPropertyChangeListener( new PropertyChangeListener() {
+            @Override
+            public void propertyChange( final PropertyChangeEvent evt ) {
+                final String propertyName = evt.getPropertyName();
+
+                if ( propertyName.equals( GraphFig.CACHE_SIZE ) || propertyName.equals( GraphFig.CACHE_TIMEOUT ) ) {
+                    updateCache();
+                }
+            }
+        } );
+
+        /**
+         * Initialize the shard
+         */
+        updateCache();
+    }
+
+
+    @Override
+    public UUID getSlice( final OrganizationScope scope, final Id nodeId, final UUID time, final String... edgeType ) {
+
+
+        final CacheKey key = new CacheKey( scope, nodeId, edgeType );
+        CacheEntry entry;
+
+        try {
+            entry = this.graphs.get( key );
+        }
+        catch ( ExecutionException e ) {
+            throw new RuntimeException( "Unable to load shard key for graph", e );
+        }
+
+        final UUID shardId = entry.getShardId( time );
+
+        if ( shardId != null ) {
+            return shardId;
+        }
+
+        //if we get here, something went wrong, our shard should always have a time UUID to return to us
+        throw new RuntimeException( "No time UUID shard was found and could not allocate one" );
+    }
+
+
+    /**
+     * This is a race condition.  We could re-init the shard while another thread is reading it.  This is fine, the read
+     * doesn't have to be precise.  The algorithm accounts for stale data.
+     */
+    private void updateCache() {
+
+        this.graphs = CacheBuilder.newBuilder().maximumSize( graphFig.getCacheSize() )
+                                  .expireAfterWrite( graphFig.getCacheTimeout(), TimeUnit.MILLISECONDS )
+                                  .build( new CacheLoader<CacheKey, CacheEntry>() {
+
+
+                                      @Override
+                                      public CacheEntry load( final CacheKey key ) throws Exception {
+
+                                          //TODO, we need to put some sort of upper bounds on this, it could possibly
+                                          //get too large
+                                          final Iterator<UUID> edges = nodeShardAllocation
+                                                  .getShards( key.scope, key.id, MAX_UUID, SHARD_PAGE_SIZE, key.types );
+
+
+                                          /**
+                                           * Perform an async audit in case we need to allocate a new shard
+                                           */
+                                          nodeShardAllocation.auditMaxShard( key.scope, key.id, key.types );
+
+                                          return new CacheEntry( edges );
+                                      }
+                                  } );
+    }
+
+
+    /**
+     * Cache key for looking up items in the shard
+     */
+    private static class CacheKey {
+        private final OrganizationScope scope;
+        private final Id id;
+        private final String[] types;
+
+
+        private CacheKey( final OrganizationScope scope, final Id id, final String[] types ) {
+            this.scope = scope;
+            this.id = id;
+            this.types = types;
+        }
+
+
+        @Override
+        public boolean equals( final Object o ) {
+            if ( this == o ) {
+                return true;
+            }
+            if ( o == null || getClass() != o.getClass() ) {
+                return false;
+            }
+
+            final CacheKey cacheKey = ( CacheKey ) o;
+
+            if ( !id.equals( cacheKey.id ) ) {
+                return false;
+            }
+            if ( !Arrays.equals( types, cacheKey.types ) ) {
+                return false;
+            }
+
+            return true;
+        }
+
+
+        @Override
+        public int hashCode() {
+            int result = id.hashCode();
+            result = 31 * result + Arrays.hashCode( types );
+            return result;
+        }
+    }
+
+
+    /**
+     * An entry for the shard.
+     */
+    private static class CacheEntry {
+        /**
+         * Get the list of all segments
+         */
+        private TreeSet<UUID> shards;
+
+
+        private CacheEntry( final Iterator<UUID> shards ) {
+            this.shards = new TreeSet<>( MetaComparator.INSTANCE );
+
+            for ( UUID shard : IterableUtil.wrap( shards ) ) {
+                this.shards.add( shard );
+            }
+        }
+
+
+        /**
+         * Get the shard's UUID for the uuid we're attempting to seek from
+         */
+        public UUID getShardId( final UUID seek ) {
+            return this.shards.floor( seek );
+        }
+    }
+
+
+    /**
+     * UUID Comparator
+     */
+    private static class MetaComparator implements Comparator<UUID> {
+
+        public static final UUIDComparator INSTANCE = new UUIDComparator();
+
+
+        @Override
+        public int compare( final UUID o1, final UUID o2 ) {
+            return com.fasterxml.uuid.UUIDComparator.staticCompare( o1, o2 );
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45c3ba5b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/TimeShardApproximation.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/TimeShardApproximation.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/TimeShardApproximation.java
new file mode 100644
index 0000000..2fc0565
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/TimeShardApproximation.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.impl.shard;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+/**
+ * Interface for creating approximate estimates of shards
+ */
+public interface TimeShardApproximation {
+
+
+    /**
+       * Increment the shard Id the specified amount
+       * @param scope The scope
+       * @param nodeId The node id
+       * @param shardId The shard id
+       * @param edgeType The edge type
+       */
+      public void increment(final OrganizationScope scope, final Id nodeId, final UUID shardId, final String... edgeType);
+
+
+    /**
+     * Get the approximation of the number of unique items
+     * @return
+     */
+    public long getCount(final OrganizationScope scope, final Id nodeId, final UUID shardId, final String... edgeType);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45c3ba5b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/TimeShardApproximationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/TimeShardApproximationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/TimeShardApproximationImpl.java
new file mode 100644
index 0000000..f253372
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/TimeShardApproximationImpl.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.impl.shard;
+
+
+import java.nio.charset.Charset;
+import java.util.UUID;
+
+import javax.inject.Inject;
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.graph.GraphFig;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import com.clearspring.analytics.hash.MurmurHash;
+import com.clearspring.analytics.stream.cardinality.HyperLogLog;
+
+
+/**
+ * Implementation for doing approximation.  Uses hy perlog log.
+ *
+ *
+ * http://blog.aggregateknowledge.com/2012/10/25/sketch-of-the-day-hyperloglog-cornerstone-of-a-big-data-infrastructure/
+ *
+ * See also
+ *
+ * http://blog.aggregateknowledge.com/2012/10/25/sketch-of-the-day-hyperloglog-cornerstone-of-a-big-data-infrastructure/
+ *
+ * See also
+ *
+ * https://github.com/addthis/stream-lib/blob/master/src/main/java/com/clearspring/analytics/stream/cardinality
+ * /HyperLogLog.java
+ */
+
+
+public class TimeShardApproximationImpl implements TimeShardApproximation {
+
+    //TODO T.N. refactor into an expiring local cache.  We need each hyperlog to be it's own instance of a given shard
+    //if this is to work
+    private static final String UTF_8 = "UTF-8";
+    private static final Charset CHARSET = Charset.forName( UTF_8 );
+
+    /**
+     * We generate a new time uuid every time a new instance is started.  We should never re-use an instance.
+     */
+    private final UUID identity = UUIDGenerator.newTimeUUID();
+    private final HyperLogLog hyperLogLog;
+
+
+    /**
+     * Create a time shard approximation with the correct configuration.
+     */
+    @Inject
+    public TimeShardApproximationImpl( final GraphFig graphFig ) {
+        hyperLogLog = new HyperLogLog( graphFig.getCounterPrecisionLoss() );
+    }
+
+
+    @Override
+    public void increment( final OrganizationScope scope, final Id nodeId, final UUID shardId,
+                           final String... edgeType ) {
+
+        byte[] hash = hash( scope, nodeId, shardId, edgeType );
+
+
+        long longHash = MurmurHash.hash64( hash, hash.length );
+
+
+        hyperLogLog.offerHashed( longHash );
+    }
+
+
+    @Override
+    public long getCount( final OrganizationScope scope, final Id nodeId, final UUID shardId,
+                          final String... edgeType ) {
+        return 0;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+
+    private byte[] hash( final OrganizationScope scope, final Id nodeId, final UUID shardId,
+                         final String... edgeTypes ) {
+        StringBuilder builder = new StringBuilder();
+
+        final Id organization = scope.getOrganization();
+
+        builder.append( organization.getUuid() );
+        builder.append( organization.getType() );
+
+        builder.append( nodeId.getUuid() );
+        builder.append( nodeId.getType() );
+
+        builder.append( shardId.toString() );
+
+        for ( String edgeType : edgeTypes ) {
+            builder.append( edgeType );
+        }
+
+        return builder.toString().getBytes( CHARSET );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45c3ba5b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeSeriesSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeSeriesSerialization.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeSeriesSerialization.java
index afa0041..17c342f 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeSeriesSerialization.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeSeriesSerialization.java
@@ -53,7 +53,7 @@ public interface EdgeSeriesSerialization {
      * @param types The types to use
      * @return
      */
-    public List<UUID> getEdgeMetaData(OrganizationScope scope, Id nodeId, UUID start, int count, String... types);
+    public Iterator<UUID> getEdgeMetaData(OrganizationScope scope, Id nodeId, UUID start, int count, String... types);
 
     /**
      * Remove the slice from the edge meta data from the types.

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45c3ba5b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/NodeSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/NodeSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/NodeSerializationImpl.java
index 50e8373..c1fffa1 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/NodeSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/NodeSerializationImpl.java
@@ -82,8 +82,8 @@ public class NodeSerializationImpl implements NodeSerialization, Migration {
      * Columns are always a byte, and the entire value is contained within a row key.  This is intentional
      * This allows us to make heavy use of Cassandra's bloom filters, as well as key caches.
      * Since most nodes will only exist for a short amount of time in this CF, we'll most likely have them in the key
-     * cache, and we'll also bounce from the BloomFilter on read.  This means our performance will be no worse
-     * than checking a distributed cache in RAM for the existence of a marked node.
+     * shard, and we'll also bounce from the BloomFilter on read.  This means our performance will be no worse
+     * than checking a distributed shard in RAM for the existence of a marked node.
      */
     private static final MultiTennantColumnFamily<OrganizationScope, Id, Boolean> GRAPH_DELETE =
             new MultiTennantColumnFamily<OrganizationScope, Id, Boolean>( "Graph_Marked_Nodes",

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45c3ba5b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/util/IterableUtil.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/util/IterableUtil.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/util/IterableUtil.java
new file mode 100644
index 0000000..d8a843f
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/util/IterableUtil.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.util;
+
+
+import java.util.Iterator;
+
+
+/**
+ * Helper class for wrapping iterators
+
+ */
+public class IterableUtil {
+
+    public static <T> Iterable<T> wrap( final Iterator<T> iterator){
+
+        return new Iterable<T>(){
+
+            @Override
+            public Iterator<T> iterator() {
+                return iterator;
+            }
+        };
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45c3ba5b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/cache/NodeShardCacheTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/cache/NodeShardCacheTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/cache/NodeShardCacheTest.java
deleted file mode 100644
index 9ea684e..0000000
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/cache/NodeShardCacheTest.java
+++ /dev/null
@@ -1,257 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.persistence.graph.impl.cache;
-
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.UUID;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import org.apache.usergrid.persistence.collection.OrganizationScope;
-import org.apache.usergrid.persistence.graph.GraphFig;
-import org.apache.usergrid.persistence.model.entity.Id;
-import org.apache.usergrid.persistence.model.util.UUIDGenerator;
-
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-
-import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createId;
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.same;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-
-/**
- * Test for the cache that mocks responses from the serialization
- */
-public class NodeShardCacheTest {
-
-
-    protected OrganizationScope scope;
-
-
-    @Before
-    public void setup() {
-        scope = mock( OrganizationScope.class );
-
-        Id orgId = mock( Id.class );
-
-        when( orgId.getType() ).thenReturn( "organization" );
-        when( orgId.getUuid() ).thenReturn( UUIDGenerator.newTimeUUID() );
-
-        when( scope.getOrganization() ).thenReturn( orgId );
-    }
-
-
-    @Test
-    public void testNoShards() throws ConnectionException {
-
-        final GraphFig graphFig = getFigMock();
-
-        final NodeShardAllocation allocation = mock( NodeShardAllocation.class );
-
-        final Id id = createId( "test" );
-
-        final String edgeType = "edge";
-
-        final String otherIdType = "type";
-
-
-        UUID newTime = UUIDGenerator.newTimeUUID();
-
-
-        NodeShardCache cache = new NodeShardCacheImpl( allocation, graphFig );
-
-
-        /**
-         * Simulate returning no shards at all.
-         */
-        when( allocation.getShards( same( scope ), same( id ), same( edgeType ), same( otherIdType ) ) )
-                .thenReturn( Collections.singletonList( NodeShardAllocation.MIN_UUID ) );
-
-
-        UUID slice = cache.getSlice( scope, id, newTime, edgeType, otherIdType );
-
-
-        //we return the min UUID possible, all edges should start by writing to this edge
-        assertEquals( NodeShardAllocation.MIN_UUID, slice );
-
-
-        /**
-         * Verify that we fired the audit
-         */
-        verify( allocation ).auditMaxShard( scope, id, edgeType, otherIdType );
-    }
-
-
-    @Test
-    public void testSingleExistingShard() {
-
-        final GraphFig graphFig = getFigMock();
-
-        final NodeShardAllocation allocation = mock( NodeShardAllocation.class );
-
-
-        final Id id = createId( "test" );
-
-        final String edgeType = "edge";
-
-        final String otherIdType = "type";
-
-
-        UUID newTime = UUIDGenerator.newTimeUUID();
-
-        final UUID min = new UUID( 0, 1 );
-
-
-        NodeShardCache cache = new NodeShardCacheImpl( allocation, graphFig );
-
-
-        /**
-         * Simulate returning single shard
-         */
-        when( allocation.getShards( same( scope ), same( id ), same( edgeType ), same( otherIdType ) ) )
-                .thenReturn( Collections.singletonList( min ) );
-
-
-        UUID slice = cache.getSlice( scope, id, newTime, edgeType, otherIdType );
-
-
-        //we return the min UUID possible, all edges should start by writing to this edge
-        assertEquals( min, slice );
-
-        /**
-         * Verify that we fired the audit
-         */
-        verify( allocation ).auditMaxShard( scope, id, edgeType, otherIdType );
-    }
-
-
-    @Test
-    public void testRangeShard() {
-
-        final GraphFig graphFig = getFigMock();
-
-        final NodeShardAllocation allocation = mock( NodeShardAllocation.class );
-
-        final Id id = createId( "test" );
-
-        final String edgeType = "edge";
-
-        final String otherIdType = "type";
-
-
-        /**
-         * Set our min mid and max
-         */
-        final UUID min = new UUID( 0, 1 );
-
-
-        final UUID mid = new UUID( 0, 100 );
-
-
-        final UUID max = new UUID( 0, 200 );
-
-
-        NodeShardCache cache = new NodeShardCacheImpl( allocation, graphFig );
-
-
-        /**
-         * Simulate returning all shards
-         */
-        when( allocation.getShards( same( scope ), same( id ), same( edgeType ), same( otherIdType ) ) )
-                .thenReturn( Arrays.asList( min, mid, max ) );
-
-
-        //check getting equal to our min, mid and max
-
-        UUID slice = cache.getSlice( scope, id, new UUID( min.getMostSignificantBits(), min.getLeastSignificantBits() ),
-                edgeType, otherIdType );
-
-
-        //we return the min UUID possible, all edges should start by writing to this edge
-        assertEquals( min, slice );
-
-        slice = cache.getSlice( scope, id, new UUID( mid.getMostSignificantBits(), mid.getLeastSignificantBits() ),
-                edgeType, otherIdType );
-
-
-        //we return the mid UUID possible, all edges should start by writing to this edge
-        assertEquals( mid, slice );
-
-        slice = cache.getSlice( scope, id, new UUID( max.getMostSignificantBits(), max.getLeastSignificantBits() ),
-                edgeType, otherIdType );
-
-
-        //we return the mid UUID possible, all edges should start by writing to this edge
-        assertEquals( max, slice );
-
-        //now test in between
-        slice = cache.getSlice( scope, id, new UUID( 0, 1 ), edgeType, otherIdType );
-
-
-        //we return the min UUID possible, all edges should start by writing to this edge
-        assertEquals( min, slice );
-
-        slice = cache.getSlice( scope, id, new UUID( 0, 99 ), edgeType, otherIdType );
-
-
-        //we return the min UUID possible, all edges should start by writing to this edge
-        assertEquals( min, slice );
-
-
-        slice = cache.getSlice( scope, id, new UUID( 0, 101 ), edgeType, otherIdType );
-
-
-        //we return the mid UUID possible, all edges should start by writing to this edge
-        assertEquals( mid, slice );
-
-        slice = cache.getSlice( scope, id, new UUID( 0, 199 ), edgeType, otherIdType );
-
-
-        //we return the mid UUID possible, all edges should start by writing to this edge
-        assertEquals( mid, slice );
-
-
-        slice = cache.getSlice( scope, id, new UUID( 0, 201 ), edgeType, otherIdType );
-
-
-        //we return the mid UUID possible, all edges should start by writing to this edge
-        assertEquals( max, slice );
-
-        /**
-         * Verify that we fired the audit
-         */
-        verify( allocation ).auditMaxShard( scope, id, edgeType, otherIdType );
-    }
-
-
-    private GraphFig getFigMock() {
-        final GraphFig graphFig = mock( GraphFig.class );
-        when( graphFig.getCacheSize() ).thenReturn( 1000 );
-        when( graphFig.getCacheTimeout() ).thenReturn( 30000l );
-
-        return graphFig;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45c3ba5b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardCacheTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardCacheTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardCacheTest.java
new file mode 100644
index 0000000..af14595
--- /dev/null
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardCacheTest.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.graph.impl.shard;
+
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.UUID;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.graph.GraphFig;
+import org.apache.usergrid.persistence.graph.impl.Constants;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createId;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+
+/**
+ * Test for the shard that mocks responses from the serialization
+ */
+public class NodeShardCacheTest {
+
+
+    protected OrganizationScope scope;
+
+
+    @Before
+    public void setup() {
+        scope = mock( OrganizationScope.class );
+
+        Id orgId = mock( Id.class );
+
+        when( orgId.getType() ).thenReturn( "organization" );
+        when( orgId.getUuid() ).thenReturn( UUIDGenerator.newTimeUUID() );
+
+        when( scope.getOrganization() ).thenReturn( orgId );
+    }
+
+
+    @Test
+    public void testNoShards() throws ConnectionException {
+
+        final GraphFig graphFig = getFigMock();
+
+        final NodeShardAllocation allocation = mock( NodeShardAllocation.class );
+
+        final Id id = createId( "test" );
+
+        final String edgeType = "edge";
+
+        final String otherIdType = "type";
+
+
+        UUID newTime = UUIDGenerator.newTimeUUID();
+
+
+        NodeShardCache cache = new NodeShardCacheImpl( allocation, graphFig );
+
+
+        /**
+         * Simulate returning no shards at all.
+         */
+        when( allocation.getShards( same( scope ), same( id ), same(Constants.MAX_UUID), any( Integer.class ), same( edgeType ), same( otherIdType ) ) )
+                .thenReturn( Collections.singletonList( Constants.MIN_UUID ).iterator() );
+
+
+        UUID slice = cache.getSlice( scope, id, newTime, edgeType, otherIdType );
+
+
+        //we return the min UUID possible, all edges should start by writing to this edge
+        assertEquals( Constants.MIN_UUID, slice );
+
+
+        /**
+         * Verify that we fired the audit
+         */
+        verify( allocation ).auditMaxShard( scope, id, edgeType, otherIdType );
+    }
+
+
+    @Test
+    public void testSingleExistingShard() {
+
+        final GraphFig graphFig = getFigMock();
+
+        final NodeShardAllocation allocation = mock( NodeShardAllocation.class );
+
+
+        final Id id = createId( "test" );
+
+        final String edgeType = "edge";
+
+        final String otherIdType = "type";
+
+
+        UUID newTime = UUIDGenerator.newTimeUUID();
+
+        final UUID min = new UUID( 0, 1 );
+
+
+        NodeShardCache cache = new NodeShardCacheImpl( allocation, graphFig );
+
+
+        /**
+         * Simulate returning single shard
+         */
+        when( allocation.getShards( same( scope ), same( id ), same(Constants.MAX_UUID), any( Integer.class ),  same( edgeType ), same( otherIdType ) ) )
+                .thenReturn( Collections.singletonList( min ).iterator() );
+
+
+        UUID slice = cache.getSlice( scope, id, newTime, edgeType, otherIdType );
+
+
+        //we return the min UUID possible, all edges should start by writing to this edge
+        assertEquals( min, slice );
+
+        /**
+         * Verify that we fired the audit
+         */
+        verify( allocation ).auditMaxShard( scope, id, edgeType, otherIdType );
+    }
+
+
+    @Test
+    public void testRangeShard() {
+
+        final GraphFig graphFig = getFigMock();
+
+        final NodeShardAllocation allocation = mock( NodeShardAllocation.class );
+
+        final Id id = createId( "test" );
+
+        final String edgeType = "edge";
+
+        final String otherIdType = "type";
+
+
+        /**
+         * Set our min mid and max
+         */
+        final UUID min = new UUID( 0, 1 );
+
+
+        final UUID mid = new UUID( 0, 100 );
+
+
+        final UUID max = new UUID( 0, 200 );
+
+
+        NodeShardCache cache = new NodeShardCacheImpl( allocation, graphFig );
+
+
+        /**
+         * Simulate returning all shards
+         */
+        when( allocation.getShards( same( scope ), same( id ),  same(Constants.MAX_UUID), any( Integer.class ), same( edgeType ), same( otherIdType ) ) )
+                .thenReturn( Arrays.asList( min, mid, max ).iterator() );
+
+
+        //check getting equal to our min, mid and max
+
+        UUID slice = cache.getSlice( scope, id, new UUID( min.getMostSignificantBits(), min.getLeastSignificantBits() ),
+                edgeType, otherIdType );
+
+
+        //we return the min UUID possible, all edges should start by writing to this edge
+        assertEquals( min, slice );
+
+        slice = cache.getSlice( scope, id, new UUID( mid.getMostSignificantBits(), mid.getLeastSignificantBits() ),
+                edgeType, otherIdType );
+
+
+        //we return the mid UUID possible, all edges should start by writing to this edge
+        assertEquals( mid, slice );
+
+        slice = cache.getSlice( scope, id, new UUID( max.getMostSignificantBits(), max.getLeastSignificantBits() ),
+                edgeType, otherIdType );
+
+
+        //we return the mid UUID possible, all edges should start by writing to this edge
+        assertEquals( max, slice );
+
+        //now test in between
+        slice = cache.getSlice( scope, id, new UUID( 0, 1 ), edgeType, otherIdType );
+
+
+        //we return the min UUID possible, all edges should start by writing to this edge
+        assertEquals( min, slice );
+
+        slice = cache.getSlice( scope, id, new UUID( 0, 99 ), edgeType, otherIdType );
+
+
+        //we return the min UUID possible, all edges should start by writing to this edge
+        assertEquals( min, slice );
+
+
+        slice = cache.getSlice( scope, id, new UUID( 0, 101 ), edgeType, otherIdType );
+
+
+        //we return the mid UUID possible, all edges should start by writing to this edge
+        assertEquals( mid, slice );
+
+        slice = cache.getSlice( scope, id, new UUID( 0, 199 ), edgeType, otherIdType );
+
+
+        //we return the mid UUID possible, all edges should start by writing to this edge
+        assertEquals( mid, slice );
+
+
+        slice = cache.getSlice( scope, id, new UUID( 0, 201 ), edgeType, otherIdType );
+
+
+        //we return the mid UUID possible, all edges should start by writing to this edge
+        assertEquals( max, slice );
+
+        /**
+         * Verify that we fired the audit
+         */
+        verify( allocation ).auditMaxShard( scope, id, edgeType, otherIdType );
+    }
+
+
+    private GraphFig getFigMock() {
+        final GraphFig graphFig = mock( GraphFig.class );
+        when( graphFig.getCacheSize() ).thenReturn( 1000 );
+        when( graphFig.getCacheTimeout() ).thenReturn( 30000l );
+
+        return graphFig;
+    }
+}