You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/04/12 00:21:31 UTC
[5/7] git commit: Updated shard and counter interfaces
Updated shard and counter interfaces
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/45c3ba5b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/45c3ba5b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/45c3ba5b
Branch: refs/heads/asyncqueue
Commit: 45c3ba5b57707752c9907f614deff36adb80ae82
Parents: c178fc6
Author: Todd Nine <tn...@apigee.com>
Authored: Fri Apr 4 17:00:11 2014 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri Apr 4 17:00:11 2014 -0700
----------------------------------------------------------------------
stack/corepersistence/graph/pom.xml | 9 +
.../usergrid/persistence/graph/GraphFig.java | 10 +-
.../graph/impl/cache/NodeShardAllocation.java | 71 -----
.../impl/cache/NodeShardAllocationImpl.java | 145 ----------
.../graph/impl/cache/NodeShardCache.java | 43 ---
.../graph/impl/cache/NodeShardCacheImpl.java | 264 -------------------
.../graph/impl/shard/NodeShardAllocation.java | 73 +++++
.../impl/shard/NodeShardAllocationImpl.java | 149 +++++++++++
.../graph/impl/shard/NodeShardCache.java | 43 +++
.../graph/impl/shard/NodeShardCacheImpl.java | 237 +++++++++++++++++
.../impl/shard/TimeShardApproximation.java | 51 ++++
.../impl/shard/TimeShardApproximationImpl.java | 118 +++++++++
.../serialization/EdgeSeriesSerialization.java | 2 +-
.../impl/NodeSerializationImpl.java | 4 +-
.../graph/serialization/util/IterableUtil.java | 43 +++
.../graph/impl/cache/NodeShardCacheTest.java | 257 ------------------
.../graph/impl/shard/NodeShardCacheTest.java | 258 ++++++++++++++++++
17 files changed, 992 insertions(+), 785 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45c3ba5b/stack/corepersistence/graph/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/pom.xml b/stack/corepersistence/graph/pom.xml
index 1abec48..64587a9 100644
--- a/stack/corepersistence/graph/pom.xml
+++ b/stack/corepersistence/graph/pom.xml
@@ -39,6 +39,14 @@
<version>${project.version}</version>
</dependency>
+
+ <!-- utilized for hyperloglog cardinality terms -->
+ <dependency>
+ <groupId>com.clearspring.analytics</groupId>
+ <artifactId>stream</artifactId>
+ <version>2.6.0</version>
+ </dependency>
+
<dependency>
<groupId>org.apache.usergrid</groupId>
<artifactId>collection</artifactId>
@@ -54,6 +62,7 @@
<scope>test</scope>
</dependency>
+
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45c3ba5b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
index 43c3b9b..03f0a6e 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
@@ -49,9 +49,11 @@ public interface GraphFig extends GuicyFig {
public static final String READ_TIMEOUT = "usergrid.graph.read.timeout";
- public static final String CACHE_TIMEOUT = "usergrid.graph.cache.timeout";
+ public static final String CACHE_TIMEOUT = "usergrid.graph.shard.timeout";
- public static final String CACHE_SIZE = "usergrid.graph.cache.size";
+ public static final String CACHE_SIZE = "usergrid.graph.shard.size";
+
+ public static final String COUNTER_PRECISION_LOSS = "usergrid.graph.counter.precision.loss";
@Default( "1000" )
@Key(SCAN_PAGE_SIZE)
@@ -100,5 +102,9 @@ public interface GraphFig extends GuicyFig {
@Default( "30000" )
@Key( CACHE_TIMEOUT )
long getCacheTimeout();
+
+ @Default(".02")
+ @Key(COUNTER_PRECISION_LOSS)
+ double getCounterPrecisionLoss();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45c3ba5b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/cache/NodeShardAllocation.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/cache/NodeShardAllocation.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/cache/NodeShardAllocation.java
deleted file mode 100644
index ad4b330..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/cache/NodeShardAllocation.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.graph.impl.cache;
-
-
-import java.util.List;
-import java.util.UUID;
-
-import org.apache.usergrid.persistence.collection.OrganizationScope;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-
-/**
- * Interface used to create and retrieve shards
- */
-public interface NodeShardAllocation {
-
-
-
- /**
- * Get all shards for the given info. If none exist, a default shard should be allocated
- * @param scope
- * @param nodeId
- * @param maxShardId The max value to start seeking from. Values <= this will be returned
- * @param count The count of elements to return
- * @param edgeTypes
- * @return A list of all shards <= the current shard. This will always return MIN_UUID if no shards are allocated
- */
- public List<UUID> getShards(final OrganizationScope scope, final Id nodeId, UUID maxShardId, int count, final String... edgeTypes);
-
-
- /**
- * Audit our highest shard for it's maximum capacity. If it has reached the max capacity <=, it will allocate a new shard
- *
- * @param scope The organization scope
- * @param nodeId The node id
- * @param edgeType The edge types
- * @return True if a new shard was allocated
- */
- public boolean auditMaxShard(final OrganizationScope scope, final Id nodeId, final String... edgeType);
-
-
- /**
- * Increment the shard Id the specified amount
- * @param scope The scope
- * @param nodeId The node id
- * @param shardId The shard id
- * @param count The count
- * @param edgeType The edge type
- */
- public void increment(final OrganizationScope scope, final Id nodeId, final UUID shardId, int count, final String... edgeType);
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45c3ba5b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/cache/NodeShardAllocationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/cache/NodeShardAllocationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/cache/NodeShardAllocationImpl.java
deleted file mode 100644
index d03ccaa..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/cache/NodeShardAllocationImpl.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.graph.impl.cache;
-
-
-import java.util.List;
-import java.util.UUID;
-
-import org.apache.usergrid.persistence.collection.OrganizationScope;
-import org.apache.usergrid.persistence.graph.GraphFig;
-import org.apache.usergrid.persistence.graph.serialization.EdgeSeriesCounterSerialization;
-import org.apache.usergrid.persistence.graph.serialization.EdgeSeriesSerialization;
-import org.apache.usergrid.persistence.model.entity.Id;
-import org.apache.usergrid.persistence.model.util.UUIDGenerator;
-
-import com.fasterxml.uuid.UUIDComparator;
-import com.google.inject.Inject;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-
-import static org.apache.usergrid.persistence.graph.impl.Constants.MAX_UUID;
-
-
-/**
- * Implementation of the node shard monitor and allocation
- */
-public class NodeShardAllocationImpl implements NodeShardAllocation {
-
-
- private final EdgeSeriesSerialization edgeSeriesSerialization;
- private final EdgeSeriesCounterSerialization edgeSeriesCounterSerialization;
- private final GraphFig graphFig;
-
-
- @Inject
- public NodeShardAllocationImpl( final EdgeSeriesSerialization edgeSeriesSerialization,
- final EdgeSeriesCounterSerialization edgeSeriesCounterSerialization,
- final GraphFig graphFig ) {
- this.edgeSeriesSerialization = edgeSeriesSerialization;
- this.edgeSeriesCounterSerialization = edgeSeriesCounterSerialization;
- this.graphFig = graphFig;
- }
-
-
- @Override
- public List<UUID> getShards( final OrganizationScope scope, final Id nodeId, final UUID maxShardId, final int count,
- final String... edgeTypes ) {
- return edgeSeriesSerialization.getEdgeMetaData( scope, nodeId, maxShardId, count, edgeTypes );
- }
-
-
- @Override
- public boolean auditMaxShard( final OrganizationScope scope, final Id nodeId, final String... edgeType ) {
-
- final UUID now = UUIDGenerator.newTimeUUID();
-
- List<UUID> maxShards = getShards( scope, nodeId, MAX_UUID, 1, edgeType );
-
- //if the first shard has already been allocated, do nothing.
-
- //now is already > than the max, don't do anything
- if ( maxShards.size() > 0 && UUIDComparator.staticCompare( now, maxShards.get( 0 ) ) < 0 ) {
- return false;
- }
-
- //allocate a new shard
- //TODO T.N. modify the time uuid utils to allow future generation, this is incorrect, but a place holder
- final UUID futureUUID = UUIDGenerator.newTimeUUID();
-
- try {
- this.edgeSeriesSerialization.writeEdgeMeta( scope, nodeId, futureUUID, edgeType ).execute();
- }
- catch ( ConnectionException e ) {
- throw new RuntimeException( "Unable to write the new edge metadata" );
- }
-
- UUID max = null;
-
-
- MutationBatch rollup = null;
- boolean completed = false;
-
- //TODO, change this into an iterator
- while ( !completed ) {
-
- List<UUID> shards = getShards( scope, nodeId, MAX_UUID, 100, edgeType );
-
- for ( UUID shardId : shards ) {
- if ( UUIDComparator.staticCompare( shardId, max ) >= 0 ) {
- completed = true;
- break;
- }
-
-
- final MutationBatch batch = edgeSeriesSerialization.removeEdgeMeta( scope, nodeId, shardId, edgeType );
-
- if ( rollup == null ) {
- rollup = batch;
- }
- else {
- rollup.mergeShallow( batch );
- }
- }
-
-
- //while our max value is > than the value we just created, delete it
- }
-
- if(rollup != null){
- try {
- rollup.execute();
- }
- catch ( ConnectionException e ) {
- throw new RuntimeException( "Unable to cleanup allocated shards" );
- }
- }
-
- return true;
- }
-
-
- @Override
- public void increment( final OrganizationScope scope, final Id nodeId, final UUID shardId, final int count,
- final String... edgeType ) {
- //delegate
- edgeSeriesCounterSerialization.incrementMetadataCount( scope, nodeId, shardId, count, edgeType );
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45c3ba5b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/cache/NodeShardCache.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/cache/NodeShardCache.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/cache/NodeShardCache.java
deleted file mode 100644
index 251ee30..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/cache/NodeShardCache.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.persistence.graph.impl.cache;
-
-
-import java.util.UUID;
-
-import org.apache.usergrid.persistence.collection.OrganizationScope;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-
-/**
- * Cache implementation for returning versions based on the slice. This cache may be latent. As a result
- * the allocation of new shards should be 2*cache timeout in the future.
- *
- */
-public interface NodeShardCache {
-
-
- /**
- * Get the time meta data for the given node
- * @param nodeId
- * @param time The time to select the slice for.
- * @param edgeType
- */
- public UUID getSlice(final OrganizationScope scope, final Id nodeId, final UUID time, final String... edgeType);
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45c3ba5b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/cache/NodeShardCacheImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/cache/NodeShardCacheImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/cache/NodeShardCacheImpl.java
deleted file mode 100644
index 943bf5f..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/cache/NodeShardCacheImpl.java
+++ /dev/null
@@ -1,264 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.persistence.graph.impl.cache;
-
-import static org.apache.usergrid.persistence.graph.impl.Constants.*;
-
-import java.beans.PropertyChangeEvent;
-import java.beans.PropertyChangeListener;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.usergrid.persistence.collection.OrganizationScope;
-import org.apache.usergrid.persistence.graph.GraphFig;
-import org.apache.usergrid.persistence.graph.serialization.EdgeSeriesSerialization;
-import org.apache.usergrid.persistence.model.entity.Id;
-import org.apache.usergrid.persistence.model.util.UUIDGenerator;
-
-import com.fasterxml.uuid.UUIDComparator;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.inject.Inject;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-
-
-/**
- * Simple implementation of the cache. Uses a local Guava cache with a timeout. If a value is not present in the
- * cache, it will need to be searched via cassandra.
- */
-public class NodeShardCacheImpl implements NodeShardCache {
-
-
-
- private static final int MAX_SHARD_COUNT = 10000;
-
-
-
- private final NodeShardAllocation nodeShardAllocation;
- private final GraphFig graphFig;
-
- private LoadingCache<CacheKey, CacheEntry> graphs;
-
-
- /**
- *
- * @param nodeShardAllocation
- * @param graphFig
- */
- @Inject
- public NodeShardCacheImpl( final NodeShardAllocation nodeShardAllocation, final GraphFig graphFig ) {
- this.nodeShardAllocation = nodeShardAllocation;
- this.graphFig = graphFig;
-
- /**
- * Add our listener to reconstruct the cache
- */
- this.graphFig.addPropertyChangeListener( new PropertyChangeListener() {
- @Override
- public void propertyChange( final PropertyChangeEvent evt ) {
- final String propertyName = evt.getPropertyName();
-
- if ( propertyName.equals( GraphFig.CACHE_SIZE ) || propertyName.equals( GraphFig.CACHE_TIMEOUT ) ) {
- updateCache();
- }
- }
- } );
-
- /**
- * Initialize the cache
- */
- updateCache();
- }
-
-
- @Override
- public UUID getSlice( final OrganizationScope scope, final Id nodeId, final UUID time, final String... edgeType ) {
-
-
- final CacheKey key = new CacheKey(scope, nodeId, edgeType );
- CacheEntry entry;
-
- try {
- entry = this.graphs.get( key );
- }
- catch ( ExecutionException e ) {
- throw new RuntimeException( "Unable to load cache key for graph", e );
- }
-
- final UUID shardId = entry.getShardId(time);
-
- if(shardId != null){
- return shardId;
- }
-
- //if we get here, something went wrong, our cache should always have a time UUID to return to us
- throw new RuntimeException( "No time UUID shard was found and could not allocate one" );
-
- }
-
-
-
- /**
- * This is a race condition. We could re-init the cache while another thread is reading it. This is fine, the read
- * doesn't have to be precise. The algorithm accounts for stale data.
- */
- private void updateCache() {
-
- this.graphs = CacheBuilder.newBuilder().maximumSize( graphFig.getCacheSize() )
- .expireAfterWrite( graphFig.getCacheTimeout(), TimeUnit.MILLISECONDS )
- .build( new CacheLoader<CacheKey, CacheEntry>() {
-
-
- @Override
- public CacheEntry load( final CacheKey key ) throws Exception {
- //doing this with a static size could result in lost "old" shards, this needs to be an
- //iterating cache that can refresh if we have a full size
- final List<UUID> edges = nodeShardAllocation.getShards( key.scope,
- key.id, MAX_UUID, MAX_SHARD_COUNT, key.types );
-
-
- /**
- * Perform an async audit in case we need to allocate a new shard
- */
- nodeShardAllocation.auditMaxShard( key.scope, key.id, key.types );
-
- return new CacheEntry( edges );
- }
- } );
- }
-
-
-
- /**
- * Cache key for looking up items in the cache
- */
- private static class CacheKey {
- private final OrganizationScope scope;
- private final Id id;
- private final String[] types;
-
-
- private CacheKey( final OrganizationScope scope, final Id id, final String[] types ) {
- this.scope = scope;
- this.id = id;
- this.types = types;
- }
-
-
- @Override
- public boolean equals( final Object o ) {
- if ( this == o ) {
- return true;
- }
- if ( o == null || getClass() != o.getClass() ) {
- return false;
- }
-
- final CacheKey cacheKey = ( CacheKey ) o;
-
- if ( !id.equals( cacheKey.id ) ) {
- return false;
- }
- if ( !Arrays.equals( types, cacheKey.types ) ) {
- return false;
- }
-
- return true;
- }
-
-
- @Override
- public int hashCode() {
- int result = id.hashCode();
- result = 31 * result + Arrays.hashCode( types );
- return result;
- }
- }
-
-
- /**
- * An entry for the cache.
- */
- private static class CacheEntry {
- /**
- * Get the list of all segments
- */
- private List<UUID> shards;
-
-
- private CacheEntry( final List<UUID> shards ) {
- this.shards = shards;
- }
-
-
- /**
- * Get the shard's UUID for the uuid we're attempting to seek from
- *
- * @param seek
- * @return
- */
- public UUID getShardId( final UUID seek) {
- int index = Collections.binarySearch( this.shards, seek, MetaComparator.INSTANCE );
-
- /**
- * We have an exact match, return it
- */
- if(index > -1){
- return shards.get( index );
- }
-
-
- //update the index to represent the index we should insert to and read it. This will be <= the UUID we were passed
- //we subtract 2 to get the index < this one
- index = (index*-1) -2;
-
-
- if(index < shards.size()){
- return shards.get( index );
- }
-
-
- return null;
- }
-
- }
-
-
-
-
- /**
- * UUID Comparator
- */
- private static class MetaComparator implements Comparator<UUID> {
-
- public static final UUIDComparator INSTANCE = new UUIDComparator();
-
-
- @Override
- public int compare( final UUID o1, final UUID o2 ) {
- return com.fasterxml.uuid.UUIDComparator.staticCompare( o1, o2 );
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45c3ba5b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardAllocation.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardAllocation.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardAllocation.java
new file mode 100644
index 0000000..bede2ab
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardAllocation.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.impl.shard;
+
+
+import java.util.Iterator;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+/**
+ * Interface used to create and retrieve shards
+ */
+public interface NodeShardAllocation {
+
+
+
+ /**
+ * Get all shards for the given info. If none exist, a default shard should be allocated
+ *
+ * @param scope
+ * @param nodeId
+ * @param maxShardId The max value to start seeking from. Values <= this will be returned
+ * @param pageSize The page size to use in the fetch
+ * @param edgeTypes
+ * @return A list of all shards <= the current shard. This will always return MIN_UUID if no shards are allocated
+ */
+ public Iterator<UUID> getShards( final OrganizationScope scope, final Id nodeId, UUID maxShardId, int pageSize,
+ final String... edgeTypes );
+
+
+ /**
+ * Audit our highest shard for it's maximum capacity. If it has reached the max capacity <=, it will allocate a new shard
+ *
+ * @param scope The organization scope
+ * @param nodeId The node id
+ * @param edgeType The edge types
+ * @return True if a new shard was allocated
+ */
+ public boolean auditMaxShard(final OrganizationScope scope, final Id nodeId, final String... edgeType);
+
+
+ /**
+ * Increment the shard Id the specified amount
+ * @param scope The scope
+ * @param nodeId The node id
+ * @param shardId The shard id
+ * @param count The count
+ * @param edgeType The edge type
+ */
+ public void increment(final OrganizationScope scope, final Id nodeId, final UUID shardId, int count, final String... edgeType);
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45c3ba5b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardAllocationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardAllocationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardAllocationImpl.java
new file mode 100644
index 0000000..e156db6
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardAllocationImpl.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.impl.shard;
+
+
+import java.util.Iterator;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.graph.GraphFig;
+import org.apache.usergrid.persistence.graph.consistency.TimeService;
+import org.apache.usergrid.persistence.graph.serialization.EdgeSeriesCounterSerialization;
+import org.apache.usergrid.persistence.graph.serialization.EdgeSeriesSerialization;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import com.fasterxml.uuid.UUIDComparator;
+import com.google.inject.Inject;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import static org.apache.usergrid.persistence.graph.impl.Constants.MAX_UUID;
+
+
+/**
+ * Implementation of the node shard monitor and allocation
+ */
+public class NodeShardAllocationImpl implements NodeShardAllocation {
+
+
+ private final EdgeSeriesSerialization edgeSeriesSerialization;
+ private final EdgeSeriesCounterSerialization edgeSeriesCounterSerialization;
+ private final TimeService timeService;
+ private final GraphFig graphFig;
+
+
+ @Inject
+ public NodeShardAllocationImpl( final EdgeSeriesSerialization edgeSeriesSerialization,
+ final EdgeSeriesCounterSerialization edgeSeriesCounterSerialization,
+ final TimeService timeService, final GraphFig graphFig ) {
+ this.edgeSeriesSerialization = edgeSeriesSerialization;
+ this.edgeSeriesCounterSerialization = edgeSeriesCounterSerialization;
+ this.timeService = timeService;
+ this.graphFig = graphFig;
+ }
+
+
+ @Override
+ public Iterator<UUID> getShards( final OrganizationScope scope, final Id nodeId, final UUID maxShardId,
+ final int pageSize, final String... edgeTypes ) {
+ return edgeSeriesSerialization.getEdgeMetaData( scope, nodeId, maxShardId, pageSize, edgeTypes );
+ }
+
+
+ @Override
+ public boolean auditMaxShard( final OrganizationScope scope, final Id nodeId, final String... edgeType ) {
+
+ final UUID now = UUIDGenerator.newTimeUUID();
+
+ Iterator<UUID> maxShards = getShards( scope, nodeId, MAX_UUID, 1, edgeType );
+
+ //if the first shard has already been allocated, do nothing.
+
+ //now is already > than the max, don't do anything
+ if ( maxShards.hasNext() && UUIDComparator.staticCompare( now, maxShards.next() ) < 0 ) {
+ return false;
+ }
+
+ final long newShardTime = timeService.getCurrentTime() + graphFig.getCacheTimeout()*2;
+
+ //allocate a new shard at least now+ 2x our shard timeout. We want to be sure that all replicas pick up on the new
+ //shard
+
+ final UUID futureUUID = UUIDGenerator.newTimeUUID(newShardTime);
+
+ try {
+ this.edgeSeriesSerialization.writeEdgeMeta( scope, nodeId, futureUUID, edgeType ).execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to write the new edge metadata" );
+ }
+
+ UUID max = null;
+
+
+ MutationBatch rollup = null;
+
+ Iterator<UUID> shards = getShards( scope, nodeId, MAX_UUID, 1000, edgeType );
+
+ while ( shards.hasNext() ) {
+
+ final UUID shardId = shards.next();
+
+ if ( UUIDComparator.staticCompare( shardId, max ) >= 0 ) {
+ break;
+ }
+
+
+ //remove the edge that is too large from the node shard allocation
+ final MutationBatch batch = edgeSeriesSerialization.removeEdgeMeta( scope, nodeId, shardId, edgeType );
+
+ if ( rollup == null ) {
+ rollup = batch;
+ }
+ else {
+ rollup.mergeShallow( batch );
+ }
+
+
+ //while our max value is > than the value we just created, delete it
+ }
+
+ if ( rollup != null ) {
+ try {
+ rollup.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to cleanup allocated shards" );
+ }
+ }
+
+ return true;
+ }
+
+
+ @Override
+ public void increment( final OrganizationScope scope, final Id nodeId, final UUID shardId, final int count,
+ final String... edgeType ) {
+ //delegate
+ edgeSeriesCounterSerialization.incrementMetadataCount( scope, nodeId, shardId, count, edgeType );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45c3ba5b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardCache.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardCache.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardCache.java
new file mode 100644
index 0000000..7515eb0
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardCache.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.graph.impl.shard;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+/**
+ * Cache implementation for returning versions based on the slice. This shard may be latent. As a result
+ * the allocation of new shards should be 2*shard timeout in the future.
+ *
+ */
+public interface NodeShardCache {
+
+
+ /**
+ * Get the time meta data for the given node
+ * @param nodeId
+ * @param time The time to select the slice for.
+ * @param edgeType
+ */
+ public UUID getSlice(final OrganizationScope scope, final Id nodeId, final UUID time, final String... edgeType);
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45c3ba5b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardCacheImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardCacheImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardCacheImpl.java
new file mode 100644
index 0000000..055abbc
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardCacheImpl.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.graph.impl.shard;
+
+
+import java.beans.PropertyChangeEvent;
+import java.beans.PropertyChangeListener;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.graph.GraphFig;
+import org.apache.usergrid.persistence.graph.serialization.util.IterableUtil;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.fasterxml.uuid.UUIDComparator;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.inject.Inject;
+
+import static org.apache.usergrid.persistence.graph.impl.Constants.MAX_UUID;
+
+
+/**
+ * Simple implementation of the shard. Uses a local Guava shard with a timeout. If a value is not present in the
+ * shard, it will need to be searched via cassandra.
+ */
+public class NodeShardCacheImpl implements NodeShardCache {
+
+
+ private static final int SHARD_PAGE_SIZE = 1000;
+
+
+ private final NodeShardAllocation nodeShardAllocation;
+ private final GraphFig graphFig;
+
+ private LoadingCache<CacheKey, CacheEntry> graphs;
+
+
+ /**
+ *
+ * @param nodeShardAllocation
+ * @param graphFig
+ */
+ @Inject
+ public NodeShardCacheImpl( final NodeShardAllocation nodeShardAllocation, final GraphFig graphFig ) {
+ this.nodeShardAllocation = nodeShardAllocation;
+ this.graphFig = graphFig;
+
+ /**
+ * Add our listener to reconstruct the shard
+ */
+ this.graphFig.addPropertyChangeListener( new PropertyChangeListener() {
+ @Override
+ public void propertyChange( final PropertyChangeEvent evt ) {
+ final String propertyName = evt.getPropertyName();
+
+ if ( propertyName.equals( GraphFig.CACHE_SIZE ) || propertyName.equals( GraphFig.CACHE_TIMEOUT ) ) {
+ updateCache();
+ }
+ }
+ } );
+
+ /**
+ * Initialize the shard
+ */
+ updateCache();
+ }
+
+
+ @Override
+ public UUID getSlice( final OrganizationScope scope, final Id nodeId, final UUID time, final String... edgeType ) {
+
+
+ final CacheKey key = new CacheKey( scope, nodeId, edgeType );
+ CacheEntry entry;
+
+ try {
+ entry = this.graphs.get( key );
+ }
+ catch ( ExecutionException e ) {
+ throw new RuntimeException( "Unable to load shard key for graph", e );
+ }
+
+ final UUID shardId = entry.getShardId( time );
+
+ if ( shardId != null ) {
+ return shardId;
+ }
+
+ //if we get here, something went wrong, our shard should always have a time UUID to return to us
+ throw new RuntimeException( "No time UUID shard was found and could not allocate one" );
+ }
+
+
+ /**
+ * This is a race condition. We could re-init the shard while another thread is reading it. This is fine, the read
+ * doesn't have to be precise. The algorithm accounts for stale data.
+ */
+ private void updateCache() {
+
+ this.graphs = CacheBuilder.newBuilder().maximumSize( graphFig.getCacheSize() )
+ .expireAfterWrite( graphFig.getCacheTimeout(), TimeUnit.MILLISECONDS )
+ .build( new CacheLoader<CacheKey, CacheEntry>() {
+
+
+ @Override
+ public CacheEntry load( final CacheKey key ) throws Exception {
+
+ //TODO, we need to put some sort of upper bounds on this, it could possibly
+ //get too large
+ final Iterator<UUID> edges = nodeShardAllocation
+ .getShards( key.scope, key.id, MAX_UUID, SHARD_PAGE_SIZE, key.types );
+
+
+ /**
+ * Perform an async audit in case we need to allocate a new shard
+ */
+ nodeShardAllocation.auditMaxShard( key.scope, key.id, key.types );
+
+ return new CacheEntry( edges );
+ }
+ } );
+ }
+
+
+ /**
+ * Cache key for looking up items in the shard
+ */
+ private static class CacheKey {
+ private final OrganizationScope scope;
+ private final Id id;
+ private final String[] types;
+
+
+ private CacheKey( final OrganizationScope scope, final Id id, final String[] types ) {
+ this.scope = scope;
+ this.id = id;
+ this.types = types;
+ }
+
+
+ @Override
+ public boolean equals( final Object o ) {
+ if ( this == o ) {
+ return true;
+ }
+ if ( o == null || getClass() != o.getClass() ) {
+ return false;
+ }
+
+ final CacheKey cacheKey = ( CacheKey ) o;
+
+ if ( !id.equals( cacheKey.id ) ) {
+ return false;
+ }
+ if ( !Arrays.equals( types, cacheKey.types ) ) {
+ return false;
+ }
+
+ return true;
+ }
+
+
+ @Override
+ public int hashCode() {
+ int result = id.hashCode();
+ result = 31 * result + Arrays.hashCode( types );
+ return result;
+ }
+ }
+
+
+ /**
+ * An entry for the shard.
+ */
+ private static class CacheEntry {
+ /**
+ * Get the list of all segments
+ */
+ private TreeSet<UUID> shards;
+
+
+ private CacheEntry( final Iterator<UUID> shards ) {
+ this.shards = new TreeSet<>( MetaComparator.INSTANCE );
+
+ for ( UUID shard : IterableUtil.wrap( shards ) ) {
+ this.shards.add( shard );
+ }
+ }
+
+
+ /**
+ * Get the shard's UUID for the uuid we're attempting to seek from
+ */
+ public UUID getShardId( final UUID seek ) {
+ return this.shards.floor( seek );
+ }
+ }
+
+
+ /**
+ * UUID Comparator
+ */
+ private static class MetaComparator implements Comparator<UUID> {
+
+ public static final UUIDComparator INSTANCE = new UUIDComparator();
+
+
+ @Override
+ public int compare( final UUID o1, final UUID o2 ) {
+ return com.fasterxml.uuid.UUIDComparator.staticCompare( o1, o2 );
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45c3ba5b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/TimeShardApproximation.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/TimeShardApproximation.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/TimeShardApproximation.java
new file mode 100644
index 0000000..2fc0565
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/TimeShardApproximation.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.impl.shard;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+/**
+ * Interface for creating approximate estimates of shards
+ */
+public interface TimeShardApproximation {
+
+
+ /**
+ * Increment the shard Id the specified amount
+ * @param scope The scope
+ * @param nodeId The node id
+ * @param shardId The shard id
+ * @param edgeType The edge type
+ */
+ public void increment(final OrganizationScope scope, final Id nodeId, final UUID shardId, final String... edgeType);
+
+
+ /**
+ * Get the approximation of the number of unique items
+ * @return
+ */
+ public long getCount(final OrganizationScope scope, final Id nodeId, final UUID shardId, final String... edgeType);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45c3ba5b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/TimeShardApproximationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/TimeShardApproximationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/TimeShardApproximationImpl.java
new file mode 100644
index 0000000..f253372
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/shard/TimeShardApproximationImpl.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.impl.shard;
+
+
+import java.nio.charset.Charset;
+import java.util.UUID;
+
+import javax.inject.Inject;
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.graph.GraphFig;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import com.clearspring.analytics.hash.MurmurHash;
+import com.clearspring.analytics.stream.cardinality.HyperLogLog;
+
+
+/**
+ * Implementation for doing approximation. Uses hy perlog log.
+ *
+ *
+ * http://blog.aggregateknowledge.com/2012/10/25/sketch-of-the-day-hyperloglog-cornerstone-of-a-big-data-infrastructure/
+ *
+ * See also
+ *
+ * http://blog.aggregateknowledge.com/2012/10/25/sketch-of-the-day-hyperloglog-cornerstone-of-a-big-data-infrastructure/
+ *
+ * See also
+ *
+ * https://github.com/addthis/stream-lib/blob/master/src/main/java/com/clearspring/analytics/stream/cardinality
+ * /HyperLogLog.java
+ */
+
+
+public class TimeShardApproximationImpl implements TimeShardApproximation {
+
+ //TODO T.N. refactor into an expiring local cache. We need each hyperlog to be it's own instance of a given shard
+ //if this is to work
+ private static final String UTF_8 = "UTF-8";
+ private static final Charset CHARSET = Charset.forName( UTF_8 );
+
+ /**
+ * We generate a new time uuid every time a new instance is started. We should never re-use an instance.
+ */
+ private final UUID identity = UUIDGenerator.newTimeUUID();
+ private final HyperLogLog hyperLogLog;
+
+
+ /**
+ * Create a time shard approximation with the correct configuration.
+ */
+ @Inject
+ public TimeShardApproximationImpl( final GraphFig graphFig ) {
+ hyperLogLog = new HyperLogLog( graphFig.getCounterPrecisionLoss() );
+ }
+
+
+ @Override
+ public void increment( final OrganizationScope scope, final Id nodeId, final UUID shardId,
+ final String... edgeType ) {
+
+ byte[] hash = hash( scope, nodeId, shardId, edgeType );
+
+
+ long longHash = MurmurHash.hash64( hash, hash.length );
+
+
+ hyperLogLog.offerHashed( longHash );
+ }
+
+
+ @Override
+ public long getCount( final OrganizationScope scope, final Id nodeId, final UUID shardId,
+ final String... edgeType ) {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+
+ private byte[] hash( final OrganizationScope scope, final Id nodeId, final UUID shardId,
+ final String... edgeTypes ) {
+ StringBuilder builder = new StringBuilder();
+
+ final Id organization = scope.getOrganization();
+
+ builder.append( organization.getUuid() );
+ builder.append( organization.getType() );
+
+ builder.append( nodeId.getUuid() );
+ builder.append( nodeId.getType() );
+
+ builder.append( shardId.toString() );
+
+ for ( String edgeType : edgeTypes ) {
+ builder.append( edgeType );
+ }
+
+ return builder.toString().getBytes( CHARSET );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45c3ba5b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeSeriesSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeSeriesSerialization.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeSeriesSerialization.java
index afa0041..17c342f 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeSeriesSerialization.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeSeriesSerialization.java
@@ -53,7 +53,7 @@ public interface EdgeSeriesSerialization {
* @param types The types to use
* @return
*/
- public List<UUID> getEdgeMetaData(OrganizationScope scope, Id nodeId, UUID start, int count, String... types);
+ public Iterator<UUID> getEdgeMetaData(OrganizationScope scope, Id nodeId, UUID start, int count, String... types);
/**
* Remove the slice from the edge meta data from the types.
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45c3ba5b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/NodeSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/NodeSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/NodeSerializationImpl.java
index 50e8373..c1fffa1 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/NodeSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/NodeSerializationImpl.java
@@ -82,8 +82,8 @@ public class NodeSerializationImpl implements NodeSerialization, Migration {
* Columns are always a byte, and the entire value is contained within a row key. This is intentional
* This allows us to make heavy use of Cassandra's bloom filters, as well as key caches.
* Since most nodes will only exist for a short amount of time in this CF, we'll most likely have them in the key
- * cache, and we'll also bounce from the BloomFilter on read. This means our performance will be no worse
- * than checking a distributed cache in RAM for the existence of a marked node.
+ * shard, and we'll also bounce from the BloomFilter on read. This means our performance will be no worse
+ * than checking a distributed shard in RAM for the existence of a marked node.
*/
private static final MultiTennantColumnFamily<OrganizationScope, Id, Boolean> GRAPH_DELETE =
new MultiTennantColumnFamily<OrganizationScope, Id, Boolean>( "Graph_Marked_Nodes",
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45c3ba5b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/util/IterableUtil.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/util/IterableUtil.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/util/IterableUtil.java
new file mode 100644
index 0000000..d8a843f
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/util/IterableUtil.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.util;
+
+
+import java.util.Iterator;
+
+
+/**
+ * Helper class for wrapping iterators
+
+ */
+public class IterableUtil {
+
+ public static <T> Iterable<T> wrap( final Iterator<T> iterator){
+
+ return new Iterable<T>(){
+
+ @Override
+ public Iterator<T> iterator() {
+ return iterator;
+ }
+ };
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45c3ba5b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/cache/NodeShardCacheTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/cache/NodeShardCacheTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/cache/NodeShardCacheTest.java
deleted file mode 100644
index 9ea684e..0000000
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/cache/NodeShardCacheTest.java
+++ /dev/null
@@ -1,257 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.persistence.graph.impl.cache;
-
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.UUID;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import org.apache.usergrid.persistence.collection.OrganizationScope;
-import org.apache.usergrid.persistence.graph.GraphFig;
-import org.apache.usergrid.persistence.model.entity.Id;
-import org.apache.usergrid.persistence.model.util.UUIDGenerator;
-
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-
-import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createId;
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.same;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-
-/**
- * Test for the cache that mocks responses from the serialization
- */
-public class NodeShardCacheTest {
-
-
- protected OrganizationScope scope;
-
-
- @Before
- public void setup() {
- scope = mock( OrganizationScope.class );
-
- Id orgId = mock( Id.class );
-
- when( orgId.getType() ).thenReturn( "organization" );
- when( orgId.getUuid() ).thenReturn( UUIDGenerator.newTimeUUID() );
-
- when( scope.getOrganization() ).thenReturn( orgId );
- }
-
-
- @Test
- public void testNoShards() throws ConnectionException {
-
- final GraphFig graphFig = getFigMock();
-
- final NodeShardAllocation allocation = mock( NodeShardAllocation.class );
-
- final Id id = createId( "test" );
-
- final String edgeType = "edge";
-
- final String otherIdType = "type";
-
-
- UUID newTime = UUIDGenerator.newTimeUUID();
-
-
- NodeShardCache cache = new NodeShardCacheImpl( allocation, graphFig );
-
-
- /**
- * Simulate returning no shards at all.
- */
- when( allocation.getShards( same( scope ), same( id ), same( edgeType ), same( otherIdType ) ) )
- .thenReturn( Collections.singletonList( NodeShardAllocation.MIN_UUID ) );
-
-
- UUID slice = cache.getSlice( scope, id, newTime, edgeType, otherIdType );
-
-
- //we return the min UUID possible, all edges should start by writing to this edge
- assertEquals( NodeShardAllocation.MIN_UUID, slice );
-
-
- /**
- * Verify that we fired the audit
- */
- verify( allocation ).auditMaxShard( scope, id, edgeType, otherIdType );
- }
-
-
- @Test
- public void testSingleExistingShard() {
-
- final GraphFig graphFig = getFigMock();
-
- final NodeShardAllocation allocation = mock( NodeShardAllocation.class );
-
-
- final Id id = createId( "test" );
-
- final String edgeType = "edge";
-
- final String otherIdType = "type";
-
-
- UUID newTime = UUIDGenerator.newTimeUUID();
-
- final UUID min = new UUID( 0, 1 );
-
-
- NodeShardCache cache = new NodeShardCacheImpl( allocation, graphFig );
-
-
- /**
- * Simulate returning single shard
- */
- when( allocation.getShards( same( scope ), same( id ), same( edgeType ), same( otherIdType ) ) )
- .thenReturn( Collections.singletonList( min ) );
-
-
- UUID slice = cache.getSlice( scope, id, newTime, edgeType, otherIdType );
-
-
- //we return the min UUID possible, all edges should start by writing to this edge
- assertEquals( min, slice );
-
- /**
- * Verify that we fired the audit
- */
- verify( allocation ).auditMaxShard( scope, id, edgeType, otherIdType );
- }
-
-
- @Test
- public void testRangeShard() {
-
- final GraphFig graphFig = getFigMock();
-
- final NodeShardAllocation allocation = mock( NodeShardAllocation.class );
-
- final Id id = createId( "test" );
-
- final String edgeType = "edge";
-
- final String otherIdType = "type";
-
-
- /**
- * Set our min mid and max
- */
- final UUID min = new UUID( 0, 1 );
-
-
- final UUID mid = new UUID( 0, 100 );
-
-
- final UUID max = new UUID( 0, 200 );
-
-
- NodeShardCache cache = new NodeShardCacheImpl( allocation, graphFig );
-
-
- /**
- * Simulate returning all shards
- */
- when( allocation.getShards( same( scope ), same( id ), same( edgeType ), same( otherIdType ) ) )
- .thenReturn( Arrays.asList( min, mid, max ) );
-
-
- //check getting equal to our min, mid and max
-
- UUID slice = cache.getSlice( scope, id, new UUID( min.getMostSignificantBits(), min.getLeastSignificantBits() ),
- edgeType, otherIdType );
-
-
- //we return the min UUID possible, all edges should start by writing to this edge
- assertEquals( min, slice );
-
- slice = cache.getSlice( scope, id, new UUID( mid.getMostSignificantBits(), mid.getLeastSignificantBits() ),
- edgeType, otherIdType );
-
-
- //we return the mid UUID possible, all edges should start by writing to this edge
- assertEquals( mid, slice );
-
- slice = cache.getSlice( scope, id, new UUID( max.getMostSignificantBits(), max.getLeastSignificantBits() ),
- edgeType, otherIdType );
-
-
- //we return the mid UUID possible, all edges should start by writing to this edge
- assertEquals( max, slice );
-
- //now test in between
- slice = cache.getSlice( scope, id, new UUID( 0, 1 ), edgeType, otherIdType );
-
-
- //we return the min UUID possible, all edges should start by writing to this edge
- assertEquals( min, slice );
-
- slice = cache.getSlice( scope, id, new UUID( 0, 99 ), edgeType, otherIdType );
-
-
- //we return the min UUID possible, all edges should start by writing to this edge
- assertEquals( min, slice );
-
-
- slice = cache.getSlice( scope, id, new UUID( 0, 101 ), edgeType, otherIdType );
-
-
- //we return the mid UUID possible, all edges should start by writing to this edge
- assertEquals( mid, slice );
-
- slice = cache.getSlice( scope, id, new UUID( 0, 199 ), edgeType, otherIdType );
-
-
- //we return the mid UUID possible, all edges should start by writing to this edge
- assertEquals( mid, slice );
-
-
- slice = cache.getSlice( scope, id, new UUID( 0, 201 ), edgeType, otherIdType );
-
-
- //we return the mid UUID possible, all edges should start by writing to this edge
- assertEquals( max, slice );
-
- /**
- * Verify that we fired the audit
- */
- verify( allocation ).auditMaxShard( scope, id, edgeType, otherIdType );
- }
-
-
- private GraphFig getFigMock() {
- final GraphFig graphFig = mock( GraphFig.class );
- when( graphFig.getCacheSize() ).thenReturn( 1000 );
- when( graphFig.getCacheTimeout() ).thenReturn( 30000l );
-
- return graphFig;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45c3ba5b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardCacheTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardCacheTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardCacheTest.java
new file mode 100644
index 0000000..af14595
--- /dev/null
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/shard/NodeShardCacheTest.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.graph.impl.shard;
+
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.UUID;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.graph.GraphFig;
+import org.apache.usergrid.persistence.graph.impl.Constants;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createId;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+
+/**
+ * Test for the shard that mocks responses from the serialization
+ */
+public class NodeShardCacheTest {
+
+
+ protected OrganizationScope scope;
+
+
+ @Before
+ public void setup() {
+ scope = mock( OrganizationScope.class );
+
+ Id orgId = mock( Id.class );
+
+ when( orgId.getType() ).thenReturn( "organization" );
+ when( orgId.getUuid() ).thenReturn( UUIDGenerator.newTimeUUID() );
+
+ when( scope.getOrganization() ).thenReturn( orgId );
+ }
+
+
+ @Test
+ public void testNoShards() throws ConnectionException {
+
+ final GraphFig graphFig = getFigMock();
+
+ final NodeShardAllocation allocation = mock( NodeShardAllocation.class );
+
+ final Id id = createId( "test" );
+
+ final String edgeType = "edge";
+
+ final String otherIdType = "type";
+
+
+ UUID newTime = UUIDGenerator.newTimeUUID();
+
+
+ NodeShardCache cache = new NodeShardCacheImpl( allocation, graphFig );
+
+
+ /**
+ * Simulate returning no shards at all.
+ */
+ when( allocation.getShards( same( scope ), same( id ), same(Constants.MAX_UUID), any( Integer.class ), same( edgeType ), same( otherIdType ) ) )
+ .thenReturn( Collections.singletonList( Constants.MIN_UUID ).iterator() );
+
+
+ UUID slice = cache.getSlice( scope, id, newTime, edgeType, otherIdType );
+
+
+ //we return the min UUID possible, all edges should start by writing to this edge
+ assertEquals( Constants.MIN_UUID, slice );
+
+
+ /**
+ * Verify that we fired the audit
+ */
+ verify( allocation ).auditMaxShard( scope, id, edgeType, otherIdType );
+ }
+
+
+ @Test
+ public void testSingleExistingShard() {
+
+ final GraphFig graphFig = getFigMock();
+
+ final NodeShardAllocation allocation = mock( NodeShardAllocation.class );
+
+
+ final Id id = createId( "test" );
+
+ final String edgeType = "edge";
+
+ final String otherIdType = "type";
+
+
+ UUID newTime = UUIDGenerator.newTimeUUID();
+
+ final UUID min = new UUID( 0, 1 );
+
+
+ NodeShardCache cache = new NodeShardCacheImpl( allocation, graphFig );
+
+
+ /**
+ * Simulate returning single shard
+ */
+ when( allocation.getShards( same( scope ), same( id ), same(Constants.MAX_UUID), any( Integer.class ), same( edgeType ), same( otherIdType ) ) )
+ .thenReturn( Collections.singletonList( min ).iterator() );
+
+
+ UUID slice = cache.getSlice( scope, id, newTime, edgeType, otherIdType );
+
+
+ //we return the min UUID possible, all edges should start by writing to this edge
+ assertEquals( min, slice );
+
+ /**
+ * Verify that we fired the audit
+ */
+ verify( allocation ).auditMaxShard( scope, id, edgeType, otherIdType );
+ }
+
+
+ @Test
+ public void testRangeShard() {
+
+ final GraphFig graphFig = getFigMock();
+
+ final NodeShardAllocation allocation = mock( NodeShardAllocation.class );
+
+ final Id id = createId( "test" );
+
+ final String edgeType = "edge";
+
+ final String otherIdType = "type";
+
+
+ /**
+ * Set our min mid and max
+ */
+ final UUID min = new UUID( 0, 1 );
+
+
+ final UUID mid = new UUID( 0, 100 );
+
+
+ final UUID max = new UUID( 0, 200 );
+
+
+ NodeShardCache cache = new NodeShardCacheImpl( allocation, graphFig );
+
+
+ /**
+ * Simulate returning all shards
+ */
+ when( allocation.getShards( same( scope ), same( id ), same(Constants.MAX_UUID), any( Integer.class ), same( edgeType ), same( otherIdType ) ) )
+ .thenReturn( Arrays.asList( min, mid, max ).iterator() );
+
+
+ //check getting equal to our min, mid and max
+
+ UUID slice = cache.getSlice( scope, id, new UUID( min.getMostSignificantBits(), min.getLeastSignificantBits() ),
+ edgeType, otherIdType );
+
+
+ //we return the min UUID possible, all edges should start by writing to this edge
+ assertEquals( min, slice );
+
+ slice = cache.getSlice( scope, id, new UUID( mid.getMostSignificantBits(), mid.getLeastSignificantBits() ),
+ edgeType, otherIdType );
+
+
+ //we return the mid UUID possible, all edges should start by writing to this edge
+ assertEquals( mid, slice );
+
+ slice = cache.getSlice( scope, id, new UUID( max.getMostSignificantBits(), max.getLeastSignificantBits() ),
+ edgeType, otherIdType );
+
+
+ //we return the mid UUID possible, all edges should start by writing to this edge
+ assertEquals( max, slice );
+
+ //now test in between
+ slice = cache.getSlice( scope, id, new UUID( 0, 1 ), edgeType, otherIdType );
+
+
+ //we return the min UUID possible, all edges should start by writing to this edge
+ assertEquals( min, slice );
+
+ slice = cache.getSlice( scope, id, new UUID( 0, 99 ), edgeType, otherIdType );
+
+
+ //we return the min UUID possible, all edges should start by writing to this edge
+ assertEquals( min, slice );
+
+
+ slice = cache.getSlice( scope, id, new UUID( 0, 101 ), edgeType, otherIdType );
+
+
+ //we return the mid UUID possible, all edges should start by writing to this edge
+ assertEquals( mid, slice );
+
+ slice = cache.getSlice( scope, id, new UUID( 0, 199 ), edgeType, otherIdType );
+
+
+ //we return the mid UUID possible, all edges should start by writing to this edge
+ assertEquals( mid, slice );
+
+
+ slice = cache.getSlice( scope, id, new UUID( 0, 201 ), edgeType, otherIdType );
+
+
+ //we return the mid UUID possible, all edges should start by writing to this edge
+ assertEquals( max, slice );
+
+ /**
+ * Verify that we fired the audit
+ */
+ verify( allocation ).auditMaxShard( scope, id, edgeType, otherIdType );
+ }
+
+
+ private GraphFig getFigMock() {
+ final GraphFig graphFig = mock( GraphFig.class );
+ when( graphFig.getCacheSize() ).thenReturn( 1000 );
+ when( graphFig.getCacheTimeout() ).thenReturn( 30000l );
+
+ return graphFig;
+ }
+}