You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2014/08/29 19:34:01 UTC
[03/20] Implemented New rolling shard algorithm. This should allow
eventual shard consistency between all clients without the need for an
external locking allocation system
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72684dfc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
index cf70669..3c3b99c 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
@@ -20,27 +20,31 @@
package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
-import java.util.ArrayList;
+import java.util.Collections;
import java.util.Iterator;
-import java.util.List;
-import org.apache.commons.collections4.iterators.PushbackIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.usergrid.persistence.core.consistency.TimeService;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.util.ValidationUtils;
import org.apache.usergrid.persistence.graph.GraphFig;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
import org.apache.usergrid.persistence.graph.exception.GraphRuntimeException;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeColumnFamilies;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardSerialization;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardAllocation;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardApproximation;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.count.NodeShardCounterSerialization;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.count.ShardKey;
-import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardedEdgeSerialization;
+import org.apache.usergrid.persistence.graph.serialization.util.GraphValidation;
import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
import com.google.inject.Inject;
-import com.netflix.astyanax.Keyspace;
-import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
@@ -50,8 +54,13 @@ import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
public class NodeShardAllocationImpl implements NodeShardAllocation {
+ private static final Logger LOG = LoggerFactory.getLogger( NodeShardAllocationImpl.class );
+
+ private static final Shard MIN_SHARD = new Shard(0, 0, true);
+
private final EdgeShardSerialization edgeShardSerialization;
-// private final NodeShardCounterSerialization edgeShardCounterSerialization;
+ private final EdgeColumnFamilies edgeColumnFamilies;
+ private final ShardedEdgeSerialization shardedEdgeSerialization;
private final NodeShardApproximation nodeShardApproximation;
private final TimeService timeService;
private final GraphFig graphFig;
@@ -59,9 +68,13 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
@Inject
public NodeShardAllocationImpl( final EdgeShardSerialization edgeShardSerialization,
- final NodeShardApproximation nodeShardApproximation,
- final TimeService timeService, final GraphFig graphFig ) {
+ final EdgeColumnFamilies edgeColumnFamilies,
+ final ShardedEdgeSerialization shardedEdgeSerialization,
+ final NodeShardApproximation nodeShardApproximation, final TimeService timeService,
+ final GraphFig graphFig ) {
this.edgeShardSerialization = edgeShardSerialization;
+ this.edgeColumnFamilies = edgeColumnFamilies;
+ this.shardedEdgeSerialization = shardedEdgeSerialization;
this.nodeShardApproximation = nodeShardApproximation;
this.timeService = timeService;
this.graphFig = graphFig;
@@ -69,109 +82,115 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
@Override
- public Iterator<Long> getShards( final ApplicationScope scope, final Id nodeId, Optional<Long> maxShardId, final String... edgeTypes ) {
-
- final Iterator<Long> existingShards =
- edgeShardSerialization.getEdgeMetaData( scope, nodeId, maxShardId, edgeTypes );
-
- final PushbackIterator<Long> pushbackIterator = new PushbackIterator( existingShards );
-//
-//
-// final long now = timeService.getCurrentTime();
-//
-//
-// final List<Long> futures = new ArrayList<Long>();
-//
-//
-// //loop through all shards, any shard > now+1 should be deleted
-// while ( pushbackIterator.hasNext() ) {
-//
-// final Long value = pushbackIterator.next();
-//
-// //we're done, our current time uuid is greater than the value stored
-// if ( now >= value ) {
-// //push it back into the iterator
-// pushbackIterator.pushback( value );
-// break;
-// }
-//
-// futures.add( value );
-// }
-//
-//
-// //we have more than 1 future value, we need to remove it
-//
-// MutationBatch cleanup = keyspace.prepareMutationBatch();
-//
-// //remove all futures except the last one, it is the only value we shouldn't lazy remove
-// for ( int i = 0; i < futures.size() -1; i++ ) {
-// final long toRemove = futures.get( i );
-//
-// final MutationBatch batch = edgeShardSerialization.removeEdgeMeta( scope, nodeId, toRemove, edgeTypes );
-//
-// cleanup.mergeShallow( batch );
-// }
-//
-//
-// try {
-// cleanup.execute();
-// }
-// catch ( ConnectionException e ) {
-// throw new GraphRuntimeException( "Unable to remove future shards, mutation error", e );
-// }
-//
-//
-// final int futuresSize = futures.size();
-//
-// if ( futuresSize > 0 ) {
-// pushbackIterator.pushback( futures.get( futuresSize - 1 ) );
-// }
-//
-//
- /**
- * Nothing to iterate, return an iterator with 0.
- */
- if(!pushbackIterator.hasNext()){
- pushbackIterator.pushback( 0l );
+ public Iterator<ShardEntryGroup> getShards( final ApplicationScope scope,
+ final Optional<Shard> maxShardId, final DirectedEdgeMeta directedEdgeMeta ) {
+
+ ValidationUtils.validateApplicationScope(scope);
+ Preconditions.checkNotNull( maxShardId, "maxShardId cannot be null" );
+ GraphValidation.validateDirectedEdgeMeta( directedEdgeMeta );
+
+ Iterator<Shard> existingShards;
+
+ //its a new node, it doesn't need to check cassandra, it won't exist
+ if(isNewNode(directedEdgeMeta)){
+ existingShards = Collections.singleton( MIN_SHARD ).iterator();
+ }
+
+ else{
+ existingShards = edgeShardSerialization.getShardMetaData( scope, maxShardId, directedEdgeMeta );
+ }
+
+ if(existingShards == null || !existingShards.hasNext()){
+
+ try {
+ edgeShardSerialization.writeShardMeta( scope, MIN_SHARD, directedEdgeMeta ).execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new GraphRuntimeException( "Unable to allocate minimum shard" );
+ }
+
+ existingShards = Collections.singleton( MIN_SHARD ).iterator();
}
- return pushbackIterator;
+ return new ShardEntryGroupIterator( existingShards, graphFig.getShardMinDelta() );
}
@Override
- public boolean auditMaxShard( final ApplicationScope scope, final Id nodeId, final String... edgeType ) {
+ public boolean auditShard( final ApplicationScope scope, final ShardEntryGroup shardEntryGroup, final DirectedEdgeMeta directedEdgeMeta) {
- final Iterator<Long> maxShards = getShards( scope, nodeId, Optional.<Long>absent(), edgeType );
+ ValidationUtils.validateApplicationScope(scope);
+ GraphValidation.validateShardEntryGroup( shardEntryGroup );
+ GraphValidation.validateDirectedEdgeMeta( directedEdgeMeta );
+ Preconditions.checkNotNull( shardEntryGroup, "shardEntryGroup cannot be null" );
- //if the first shard has already been allocated, do nothing.
- //now is already > than the max, don't do anything
- if ( !maxShards.hasNext() ) {
+ /**
+ * Nothing to do, it's been created very recently, we don't create a new one
+ */
+ if (shardEntryGroup.isCompactionPending()) {
+ return false;
+ }
+
+ //we can't allocate, we have more than 1 write shard currently. We need to compact first
+ if(shardEntryGroup.entrySize() != 1){
+ return false;
+ }
+
+
+ /**
+ * Check the min shard in our system
+ */
+ final Shard shard = shardEntryGroup.getMinShard();
+
+
+ if (shard.getCreatedTime() >= getMinTime()){
return false;
}
- final long maxShard = maxShards.next();
/**
* Check out if we have a count for our shard allocation
*/
+ final long count =
+ nodeShardApproximation.getCount( scope, shard, directedEdgeMeta);
- final long count = nodeShardApproximation.getCount( scope, nodeId, maxShard, edgeType );
if ( count < graphFig.getShardSize() ) {
return false;
}
- //try to get a lock here, and fail if one isn't present
- final long newShardTime = timeService.getCurrentTime() + graphFig.getShardCacheTimeout() * 2;
+
+
+ /**
+ * Allocate the shard
+ */
+
+ final Iterator<MarkedEdge> edges = directedEdgeMeta.loadEdges( shardedEdgeSerialization, edgeColumnFamilies, scope, shardEntryGroup, Long.MAX_VALUE );
+
+
+ if ( !edges.hasNext() ) {
+ LOG.warn( "Tried to allocate a new shard for edge meta data {}, "
+ + "but no max value could be found in that row", directedEdgeMeta );
+ return false;
+ }
+
+ //we have a next, allocate it based on the max
+
+ MarkedEdge marked = edges.next();
+
+ final long createTimestamp = timeService.getCurrentTime();
+
+ final Shard newShard = new Shard(marked.getTimestamp(), createTimestamp, false);
try {
- this.edgeShardSerialization.writeEdgeMeta( scope, nodeId, newShardTime, edgeType ).execute();
+ this.edgeShardSerialization
+ .writeShardMeta( scope, newShard, directedEdgeMeta )
+ .execute();
}
catch ( ConnectionException e ) {
throw new GraphRuntimeException( "Unable to write the new edge metadata" );
@@ -181,4 +200,50 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
return true;
}
+
+ @Override
+ public long getMinTime() {
+
+ final long minimumAllowed = 2 * graphFig.getShardCacheTimeout();
+
+ final long minDelta = graphFig.getShardMinDelta();
+
+
+ if ( minDelta < minimumAllowed ) {
+ throw new GraphRuntimeException( String.format(
+ "You must configure the property %s to be >= 2 x %s. Otherwise you risk losing data",
+ GraphFig.SHARD_MIN_DELTA, GraphFig.SHARD_CACHE_TIMEOUT ) );
+ }
+
+ return timeService.getCurrentTime() - minDelta;
+ }
+
+
+ /**
+ * Return true if the node has been created within our timeout. If this is the case, we dont' need to check
+ * cassandra, we know it won't exist
+ *
+ * @param directedEdgeMeta
+ * @return
+ */
+ private boolean isNewNode(DirectedEdgeMeta directedEdgeMeta){
+
+ /**
+ * the max time in microseconds we can allow
+ */
+ final long maxTime = (timeService.getCurrentTime() + graphFig.getShardCacheTimeout() )* 10000;
+
+ for(DirectedEdgeMeta.NodeMeta node: directedEdgeMeta.getNodes()){
+ final long uuidTime = node.getId().getUuid().timestamp();
+
+ if(uuidTime < maxTime){
+ return true;
+ }
+ }
+
+ return false;
+
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72684dfc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
index 3b78898..d3dd86e 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
@@ -21,55 +21,77 @@ package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
import java.beans.PropertyChangeEvent;
import java.beans.PropertyChangeListener;
-import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
-import java.util.TreeSet;
-import java.util.UUID;
+import java.util.Map;
+import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.core.consistency.TimeService;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.util.ValidationUtils;
import org.apache.usergrid.persistence.graph.GraphFig;
import org.apache.usergrid.persistence.graph.exception.GraphRuntimeException;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardAllocation;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardCache;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
+import org.apache.usergrid.persistence.graph.serialization.util.GraphValidation;
import org.apache.usergrid.persistence.graph.serialization.util.IterableUtil;
-import org.apache.usergrid.persistence.model.entity.Id;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import com.google.common.cache.Weigher;
import com.google.inject.Inject;
-
/**
* Simple implementation of the shard. Uses a local Guava shard with a timeout. If a value is not present in the
* shard, it will need to be searched via cassandra.
*/
public class NodeShardCacheImpl implements NodeShardCache {
+ private static final Logger LOG = LoggerFactory.getLogger( NodeShardCacheImpl.class );
+
+ /**
+ * Only cache shards that have < 10k groups. This is an arbitrary amount, and may change with profiling and
+ * testing
+ */
+ private static final int MAX_WEIGHT_PER_ELEMENT = 10000;
+
private final NodeShardAllocation nodeShardAllocation;
private final GraphFig graphFig;
+ private final TimeService timeservice;
private LoadingCache<CacheKey, CacheEntry> graphs;
/**
- *
- * @param nodeShardAllocation
+ * @param nodeShardAllocation
* @param graphFig
+ * @param timeservice
*/
@Inject
- public NodeShardCacheImpl( final NodeShardAllocation nodeShardAllocation, final GraphFig graphFig ) {
- Preconditions.checkNotNull(nodeShardAllocation, "nodeShardAllocation is required");
- Preconditions.checkNotNull(graphFig, "consistencyFig is required");
+ public NodeShardCacheImpl( final NodeShardAllocation nodeShardAllocation, final GraphFig graphFig,
+ final TimeService timeservice ) {
+
+ Preconditions.checkNotNull( nodeShardAllocation, "nodeShardAllocation is required" );
+ Preconditions.checkNotNull( graphFig, "consistencyFig is required" );
+ Preconditions.checkNotNull( timeservice, "timeservice is required" );
this.nodeShardAllocation = nodeShardAllocation;
this.graphFig = graphFig;
+ this.timeservice = timeservice;
/**
* Add our listener to reconstruct the shard
@@ -79,24 +101,29 @@ public class NodeShardCacheImpl implements NodeShardCache {
public void propertyChange( final PropertyChangeEvent evt ) {
final String propertyName = evt.getPropertyName();
- if ( propertyName.equals( GraphFig.SHARD_CACHE_SIZE ) || propertyName.equals( GraphFig.SHARD_CACHE_TIMEOUT ) ) {
+ if ( propertyName.equals( GraphFig.SHARD_CACHE_SIZE ) || propertyName
+ .equals( GraphFig.SHARD_CACHE_TIMEOUT ) ) {
+
updateCache();
}
}
} );
/**
- * Initialize the shard
+ * Initialize the shard cache
*/
updateCache();
}
@Override
- public long getSlice( final ApplicationScope scope, final Id nodeId, final long timestamp, final String... edgeType ) {
+ public ShardEntryGroup getWriteShardGroup( final ApplicationScope scope, final long timestamp,
+ final DirectedEdgeMeta directedEdgeMeta ) {
+ ValidationUtils.validateApplicationScope(scope);
+ GraphValidation.validateDirectedEdgeMeta( directedEdgeMeta );
- final CacheKey key = new CacheKey( scope, nodeId, edgeType );
+ final CacheKey key = new CacheKey( scope, directedEdgeMeta );
CacheEntry entry;
try {
@@ -106,7 +133,7 @@ public class NodeShardCacheImpl implements NodeShardCache {
throw new GraphRuntimeException( "Unable to load shard key for graph", e );
}
- final Long shardId = entry.getShardId( timestamp );
+ final ShardEntryGroup shardId = entry.getShardId( timestamp );
if ( shardId != null ) {
return shardId;
@@ -118,55 +145,41 @@ public class NodeShardCacheImpl implements NodeShardCache {
@Override
- public Iterator<Long> getVersions( final ApplicationScope scope, final Id nodeId, final long maxTimestamp,
- final String... edgeType ) {
- final CacheKey key = new CacheKey( scope, nodeId, edgeType );
- CacheEntry entry;
-
- try {
- entry = this.graphs.get( key );
- }
- catch ( ExecutionException e ) {
- throw new GraphRuntimeException( "Unable to load shard key for graph", e );
- }
-
- Iterator<Long> iterator = entry.getShards( maxTimestamp );
-
- if(iterator == null){
- return Collections.<Long>emptyList().iterator();
+ public Iterator<ShardEntryGroup> getReadShardGroup( final ApplicationScope scope, final long maxTimestamp,
+ final DirectedEdgeMeta directedEdgeMeta ) {
+ ValidationUtils.validateApplicationScope(scope);
+ GraphValidation.validateDirectedEdgeMeta( directedEdgeMeta );
+
+ final CacheKey key = new CacheKey( scope, directedEdgeMeta );
+ CacheEntry entry;
+
+ try {
+ entry = this.graphs.get( key );
+ }
+ catch ( ExecutionException e ) {
+ throw new GraphRuntimeException( "Unable to load shard key for graph", e );
+ }
+
+ Iterator<ShardEntryGroup> iterator = entry.getShards( maxTimestamp );
+
+ if ( iterator == null ) {
+ return Collections.<ShardEntryGroup>emptyList().iterator();
}
return iterator;
}
+
/**
* This is a race condition. We could re-init the shard while another thread is reading it. This is fine, the read
* doesn't have to be precise. The algorithm accounts for stale data.
*/
private void updateCache() {
- this.graphs = CacheBuilder.newBuilder().maximumSize( graphFig.getShardCacheSize() )
- .expireAfterWrite( graphFig.getShardCacheSize(), TimeUnit.MILLISECONDS )
- .build( new CacheLoader<CacheKey, CacheEntry>() {
-
-
- @Override
- public CacheEntry load( final CacheKey key ) throws Exception {
-
-//
-// /**
-// * Perform an audit in case we need to allocate a new shard
-// */
-// nodeShardAllocation.auditMaxShard( key.scope, key.id, key.types );
-// //TODO, we need to put some sort of upper bounds on this, it could possibly get too large
-
-
- final Iterator<Long> edges = nodeShardAllocation
- .getShards( key.scope, key.id, Optional.<Long>absent(), key.types );
-
- return new CacheEntry( edges );
- }
- } );
+ this.graphs = CacheBuilder.newBuilder().expireAfterWrite( graphFig.getShardCacheSize(), TimeUnit.MILLISECONDS )
+ .removalListener( new ShardRemovalListener() )
+ .maximumWeight( MAX_WEIGHT_PER_ELEMENT * graphFig.getShardCacheSize() )
+ .weigher( new ShardWeigher() ).build( new ShardCacheLoader() );
}
@@ -175,14 +188,12 @@ public class NodeShardCacheImpl implements NodeShardCache {
*/
private static class CacheKey {
private final ApplicationScope scope;
- private final Id id;
- private final String[] types;
+ private final DirectedEdgeMeta directedEdgeMeta;
- private CacheKey( final ApplicationScope scope, final Id id, final String[] types ) {
+ private CacheKey( final ApplicationScope scope, final DirectedEdgeMeta directedEdgeMeta ) {
this.scope = scope;
- this.id = id;
- this.types = types;
+ this.directedEdgeMeta = directedEdgeMeta;
}
@@ -197,21 +208,23 @@ public class NodeShardCacheImpl implements NodeShardCache {
final CacheKey cacheKey = ( CacheKey ) o;
- if ( !id.equals( cacheKey.id ) ) {
+ if ( !scope.equals( cacheKey.scope ) ) {
return false;
}
- if ( !Arrays.equals( types, cacheKey.types ) ) {
+
+ if ( !directedEdgeMeta.equals( cacheKey.directedEdgeMeta ) ) {
return false;
}
+
return true;
}
@Override
public int hashCode() {
- int result = id.hashCode();
- result = 31 * result + Arrays.hashCode( types );
+ int result = scope.hashCode();
+ result = 31 * result + directedEdgeMeta.hashCode();
return result;
}
}
@@ -220,38 +233,140 @@ public class NodeShardCacheImpl implements NodeShardCache {
/**
* An entry for the shard.
*/
- private static class CacheEntry {
+ public static final class CacheEntry {
/**
* Get the list of all segments
*/
- private TreeSet<Long> shards;
+ private TreeMap<Long, ShardEntryGroup> shards;
- private CacheEntry( final Iterator<Long> shards ) {
- this.shards = new TreeSet<>( );
+ private CacheEntry( final Iterator<ShardEntryGroup> shards ) {
+ Preconditions.checkArgument( shards.hasNext(),
+ "More than 1 entry must be present in the shard to load into cache" );
- for ( Long shard : IterableUtil.wrap( shards ) ) {
- this.shards.add( shard );
+ this.shards = new TreeMap<>();
+ /**
+ * TODO, we need to bound this. While I don't envision more than a thousand groups max,
+ * we don't want 1 entry to use all our ram
+ */
+ for ( ShardEntryGroup shard : IterableUtil.wrap( shards ) ) {
+ this.shards.put( shard.getMinShard().getShardIndex(), shard );
}
}
/**
- * Get the shard's UUID for the uuid we're attempting to seek from
+ * Return the size of the elements in the cache
*/
- public Long getShardId( final Long seek ) {
- return this.shards.floor( seek );
+ public int getCacheSize() {
+ return this.shards.size();
}
/**
* Get all shards <= this one in decending order
- * @return
*/
- public Iterator<Long> getShards( final Long maxShard ){
- return this.shards.headSet(maxShard, true ).descendingIterator();
+ public Iterator<ShardEntryGroup> getShards( final Long maxShard ) {
+
+ final Long firstKey = shards.floorKey( maxShard );
+
+ return shards.headMap( firstKey, true ).descendingMap().values().iterator();
+ }
+
+
+ /**
+ * Get the shard entry that should hold this value
+ */
+ public ShardEntryGroup getShardId( final Long seek ) {
+ Map.Entry<Long, ShardEntryGroup> entry = shards.floorEntry( seek );
+
+ if ( entry == null ) {
+ throw new NullPointerException( "Entry should never be null, this is a bug" );
+ }
+
+ return entry.getValue();
}
}
+ /**
+ * Load the cache entries from the shards we have stored
+ */
+ final class ShardCacheLoader extends CacheLoader<CacheKey, CacheEntry> {
+
+
+ @Override
+ public CacheEntry load( final CacheKey key ) throws Exception {
+
+
+ final Iterator<ShardEntryGroup> edges =
+ nodeShardAllocation.getShards( key.scope, Optional.<Shard>absent(), key.directedEdgeMeta );
+
+ return new CacheEntry( edges );
+ }
+ }
+
+
+ /**
+ * Calculates the weight of the entry by geting the size of the cache
+ */
+ final class ShardWeigher implements Weigher<CacheKey, CacheEntry> {
+
+ @Override
+ public int weigh( final CacheKey key, final CacheEntry value ) {
+ return value.getCacheSize();
+ }
+ }
+
+
+ /**
+ * On removal from the cache, we want to audit the maximum shard. If it needs to allocate a new shard, we want to
+ * do so. IF there's a compaction pending, we want to run the compaction task
+ */
+ final class ShardRemovalListener implements RemovalListener<CacheKey, CacheEntry> {
+
+ @Override
+ public void onRemoval( final RemovalNotification<CacheKey, CacheEntry> notification ) {
+
+
+ final CacheKey key = notification.getKey();
+ final CacheEntry entry = notification.getValue();
+
+
+ Iterator<ShardEntryGroup> groups = entry.getShards( Long.MAX_VALUE );
+
+
+ /**
+ * Start at our max, then
+ */
+
+ //audit all our groups
+ while ( groups.hasNext() ) {
+ ShardEntryGroup group = groups.next();
+
+ /**
+ * We don't have a compaction pending. Run an audit on the shards
+ */
+ if ( !group.isCompactionPending() ) {
+ LOG.debug( "No compaction pending, checking max shard on expiration" );
+ /**
+ * Check if we should allocate, we may want to
+ */
+
+
+ nodeShardAllocation.auditShard( key.scope, group, key.directedEdgeMeta );
+ continue;
+ }
+ /**
+ * Do the compaction
+ */
+ if ( group.shouldCompact( timeservice.getCurrentTime() ) ) {
+ //launch the compaction
+ }
+
+ //no op, there's nothing we need to do to this shard
+
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72684dfc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/RowSerializer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/RowSerializer.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/RowSerializer.java
new file mode 100644
index 0000000..1edaf21
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/RowSerializer.java
@@ -0,0 +1,63 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
+
+
+import org.apache.usergrid.persistence.core.astyanax.CompositeFieldSerializer;
+import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.RowKey;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.netflix.astyanax.model.CompositeBuilder;
+import com.netflix.astyanax.model.CompositeParser;
+
+
+/**
+ * Class to perform serialization for row keys from edges
+ */
+public class RowSerializer implements CompositeFieldSerializer<RowKey> {
+
+ private static final IdRowCompositeSerializer ID_SER = IdRowCompositeSerializer.get();
+
+
+ @Override
+ public void toComposite( final CompositeBuilder builder, final RowKey key ) {
+
+ //add the row id to the composite
+ ID_SER.toComposite( builder, key.nodeId );
+
+ builder.addString( key.edgeType );
+ builder.addLong( key.shardId );
+ }
+
+
+ @Override
+ public RowKey fromComposite( final CompositeParser composite ) {
+
+ final Id id = ID_SER.fromComposite( composite );
+ final String edgeType = composite.readString();
+ final long shard = composite.readLong();
+
+
+ return new RowKey( id, edgeType, shard );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72684dfc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/RowTypeSerializer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/RowTypeSerializer.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/RowTypeSerializer.java
new file mode 100644
index 0000000..2edea56
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/RowTypeSerializer.java
@@ -0,0 +1,62 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
+
+
+import org.apache.usergrid.persistence.core.astyanax.CompositeFieldSerializer;
+import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.RowKeyType;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.netflix.astyanax.model.CompositeBuilder;
+import com.netflix.astyanax.model.CompositeParser;
+
+
+public class RowTypeSerializer implements CompositeFieldSerializer<RowKeyType> {
+
+ private static final IdRowCompositeSerializer ID_SER = IdRowCompositeSerializer.get();
+
+
+ @Override
+ public void toComposite( final CompositeBuilder builder, final RowKeyType keyType ) {
+
+ //add the row id to the composite
+ ID_SER.toComposite( builder, keyType.nodeId );
+
+ builder.addString( keyType.edgeType );
+ builder.addString( keyType.idType );
+
+ builder.addLong( keyType.shardId );
+ }
+
+
+ @Override
+ public RowKeyType fromComposite( final CompositeParser composite ) {
+
+ final Id id = ID_SER.fromComposite( composite );
+ final String edgeType = composite.readString();
+ final String idType = composite.readString();
+ final long shard = composite.readLong();
+
+ return new RowKeyType( id, edgeType, idType, shard);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72684dfc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java
new file mode 100644
index 0000000..c8a884b
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java
@@ -0,0 +1,100 @@
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
+
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import org.apache.commons.collections4.iterators.PushbackIterator;
+
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
+
+import com.google.common.base.Preconditions;
+
+
+/**
+ * Utility class that will take an iterator of all shards, and combine them into an iterator
+ * of ShardEntryGroups. These groups can then be used in a distributed system to handle concurrent reads and writes
+ */
+public class ShardEntryGroupIterator implements Iterator<ShardEntryGroup> {
+
+
+ private ShardEntryGroup next;
+ private final PushbackIterator<Shard> sourceIterator;
+ private final long minDelta;
+
+
+ /**
+ * Create a shard iterator
+ * @param shardIterator The iterator of all shards. Order is expected to be by the shard index from Long.MAX to Long.MIN
+ * @param minDelta The minimum delta we allow to consider shards the same group
+ */
+ public ShardEntryGroupIterator( final Iterator<Shard> shardIterator, final long minDelta ) {
+ Preconditions.checkArgument(shardIterator.hasNext(), "Shard iterator must have shards present");
+ this.sourceIterator = new PushbackIterator( shardIterator );
+ this.minDelta = minDelta;
+ }
+
+
+ @Override
+ public boolean hasNext() {
+ if ( next == null ) {
+ advance();
+ }
+
+ return next != null;
+ }
+
+
+ @Override
+ public ShardEntryGroup next() {
+ if ( !hasNext() ) {
+ throw new NoSuchElementException( "No more elements exist in iterator" );
+ }
+
+
+ final ShardEntryGroup toReturn = next;
+
+ next = null;
+
+ return toReturn;
+ }
+
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException( "Remove is not supported" );
+ }
+
+
+ /**
+ * Advance to the next element
+ */
+ private void advance() {
+
+ /**
+ * We loop through until we've exhausted our source, or we have 2 elements, which means
+ * they're > min time allocation from one another
+ */
+ while ( sourceIterator.hasNext() ) {
+
+ if(next == null){
+ next = new ShardEntryGroup( minDelta );
+ }
+
+ final Shard shard = sourceIterator.next();
+
+
+ //we can't add this one to the entries, it doesn't fit within the delta, allocate a new one and break
+ if ( next.addShard( shard ) ) {
+ continue;
+ }
+
+
+ sourceIterator.pushback( shard );
+ break;
+ }
+
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72684dfc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
new file mode 100644
index 0000000..48336cd
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
@@ -0,0 +1,75 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
+
+
+import java.util.Collection;
+
+import org.apache.usergrid.persistence.core.consistency.TimeService;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardGroupCompaction;
+
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+import rx.Observable;
+
+
+/**
+ * Implementation of the shard group compaction
+ */
+@Singleton
+public class ShardGroupCompactionImpl implements ShardGroupCompaction {
+
+
+ private final TimeService timeService;
+
+
+ @Inject
+ public ShardGroupCompactionImpl( final TimeService timeService ) {this.timeService = timeService;}
+
+
+ @Override
+ public Observable<Integer> compact( final ShardEntryGroup group ) {
+ final long startTime = timeService.getCurrentTime();
+
+ Preconditions.checkNotNull(group, "group cannot be null");
+ Preconditions.checkArgument( group.isCompactionPending(), "Compaction is pending" );
+ Preconditions.checkArgument( group.shouldCompact(startTime ), "Compaction can now be run" );
+
+
+ final Shard targetShard = group.getCompactionTarget();
+
+ final Collection<Shard> sourceShards = group.getReadShards();
+
+
+ //now get iterators for each of the source shards, and then copy them to the compaction target shard
+
+
+
+
+ return null;
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72684dfc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardRowIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardRowIterator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardRowIterator.java
new file mode 100644
index 0000000..0bbb011
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardRowIterator.java
@@ -0,0 +1,134 @@
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
+
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.usergrid.persistence.core.astyanax.ColumnNameIterator;
+import org.apache.usergrid.persistence.core.astyanax.MultiKeyColumnNameIterator;
+import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
+import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.model.ConsistencyLevel;
+import com.netflix.astyanax.query.RowQuery;
+import com.netflix.astyanax.util.RangeBuilder;
+
+
+/**
+ * Internal iterator to iterate over multiple row keys
+ *
+ * @param <R> The row type
+ * @param <C> The column type
+ * @param <T> The parsed return type
+ */
+public class ShardRowIterator<R, C, T> implements Iterator<T> {
+
+ private final EdgeSearcher<R, C, T> searcher;
+
+ private final MultiTennantColumnFamily<ApplicationScope, R, C> cf;
+
+ private Iterator<T> currentColumnIterator;
+
+ private final Keyspace keyspace;
+
+ private final int pageSize;
+
+ private final ConsistencyLevel consistencyLevel;
+
+
+ public ShardRowIterator( final EdgeSearcher<R, C, T> searcher,
+ final MultiTennantColumnFamily<ApplicationScope, R, C> cf, final Keyspace keyspace,
+ final ConsistencyLevel consistencyLevel, final int pageSize ) {
+ this.searcher = searcher;
+ this.cf = cf;
+ this.keyspace = keyspace;
+ this.pageSize = pageSize;
+ this.consistencyLevel = consistencyLevel;
+ }
+
+
+ @Override
+ public boolean hasNext() {
+ //we have more columns to return
+ if ( currentColumnIterator != null && currentColumnIterator.hasNext() ) {
+ return true;
+ }
+
+ /**
+ * We have another row key, advance to it and re-check
+ */
+ if ( searcher.hasNext() ) {
+ advanceRow();
+ return hasNext();
+ }
+
+ //we have no more columns, and no more row keys, we're done
+ return false;
+ }
+
+
+ @Override
+ public T next() {
+ if ( !hasNext() ) {
+ throw new NoSuchElementException( "There are no more rows or columns left to advance" );
+ }
+
+ return currentColumnIterator.next();
+ }
+
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException( "Remove is unsupported" );
+ }
+
+
+ /**
+ * Advance our iterator to the next row (assumes the check for row keys is elsewhere)
+ */
+ private void advanceRow() {
+
+ /**
+ * If the edge is present, we need to being seeking from this
+ */
+
+ final RangeBuilder rangeBuilder = new RangeBuilder().setLimit( pageSize );
+
+
+ //set the range into the search
+ searcher.setRange( rangeBuilder );
+
+ /**
+ * Get our list of slices
+ */
+ final List<ScopedRowKey<ApplicationScope, R>> rowKeys = searcher.next();
+
+
+ final List<ColumnNameIterator<C, T>> columnNameIterators = new ArrayList<>( rowKeys.size() );
+
+ for(ScopedRowKey<ApplicationScope, R> rowKey: rowKeys){
+
+
+
+ final RowQuery<ScopedRowKey<ApplicationScope, R>, C> query =
+ keyspace.prepareQuery( cf ).setConsistencyLevel( consistencyLevel ).getKey( rowKey )
+ .autoPaginate( true ).withColumnRange( rangeBuilder.build() );
+
+
+ final ColumnNameIterator<C, T> columnNameIterator = new ColumnNameIterator<>( query, searcher, searcher.hasPage() );
+
+ columnNameIterators.add( columnNameIterator );
+
+ }
+
+
+
+ currentColumnIterator = new MultiKeyColumnNameIterator<>(columnNameIterators, searcher, pageSize);
+
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72684dfc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
new file mode 100644
index 0000000..6f7c5ce
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
@@ -0,0 +1,654 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ *
+ */
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
+
+
+import java.util.Iterator;
+import java.util.UUID;
+
+import javax.inject.Inject;
+
+import org.apache.usergrid.persistence.core.astyanax.CassandraConfig;
+import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
+import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
+import org.apache.usergrid.persistence.core.consistency.TimeService;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.util.ValidationUtils;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphFig;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.SearchByEdge;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
+import org.apache.usergrid.persistence.graph.SearchByIdType;
+import org.apache.usergrid.persistence.graph.impl.SimpleMarkedEdge;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdge;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeColumnFamilies;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeRowKey;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardStrategy;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.RowKey;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.RowKeyType;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardedEdgeSerialization;
+import org.apache.usergrid.persistence.graph.serialization.util.GraphValidation;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Optional;
+import com.google.inject.Singleton;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.Serializer;
+import com.netflix.astyanax.util.RangeBuilder;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+
+@Singleton
+public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
+
+ protected final Keyspace keyspace;
+ protected final CassandraConfig cassandraConfig;
+ protected final GraphFig graphFig;
+ protected final EdgeShardStrategy writeEdgeShardStrategy;
+ protected final TimeService timeService;
+
+
+ @Inject
+ public ShardedEdgeSerializationImpl( final Keyspace keyspace, final CassandraConfig cassandraConfig,
+ final GraphFig graphFig, final EdgeShardStrategy writeEdgeShardStrategy,
+ final TimeService timeService ) {
+
+
+ checkNotNull( "keyspace required", keyspace );
+ checkNotNull( "cassandraConfig required", cassandraConfig );
+ checkNotNull( "consistencyFig required", graphFig );
+ checkNotNull( "writeEdgeShardStrategy required", writeEdgeShardStrategy );
+ checkNotNull( "timeService required", timeService );
+
+
+ this.keyspace = keyspace;
+ this.cassandraConfig = cassandraConfig;
+ this.graphFig = graphFig;
+ this.writeEdgeShardStrategy = writeEdgeShardStrategy;
+ this.timeService = timeService;
+ }
+
+
+ @Override
+ public MutationBatch writeEdge( final EdgeColumnFamilies columnFamilies, final ApplicationScope scope,
+ final MarkedEdge markedEdge, final UUID timestamp ) {
+ ValidationUtils.validateApplicationScope( scope );
+ GraphValidation.validateEdge( markedEdge );
+ ValidationUtils.verifyTimeUuid( timestamp, "timestamp" );
+
+
+ final MutationBatch batch = keyspace.prepareMutationBatch().withConsistencyLevel( cassandraConfig.getWriteCL() )
+ .withTimestamp( timestamp.timestamp() );
+
+ final boolean isDeleted = markedEdge.isDeleted();
+
+
+ doWrite( columnFamilies, scope, markedEdge, new RowOp<RowKey>() {
+ @Override
+ public void writeEdge( final MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> columnFamily,
+ final RowKey rowKey, final DirectedEdge edge ) {
+ batch.withRow( columnFamily, ScopedRowKey.fromKey( scope, rowKey ) ).putColumn( edge, isDeleted );
+ }
+
+
+ @Override
+ public void countEdge( final Shard shard, final DirectedEdgeMeta directedEdgeMeta ) {
+ if(!isDeleted) {
+ writeEdgeShardStrategy.increment( scope, shard, 1, directedEdgeMeta );
+ }
+ }
+
+
+
+ @Override
+ public void writeVersion( final MultiTennantColumnFamily<ApplicationScope, EdgeRowKey, Long> columnFamily,
+ final EdgeRowKey rowKey, final long timestamp ) {
+ batch.withRow( columnFamily, ScopedRowKey.fromKey( scope, rowKey ) ).putColumn( timestamp, isDeleted );
+ }
+ } );
+
+
+ return batch;
+ }
+
+
+ @Override
+ public MutationBatch deleteEdge( final EdgeColumnFamilies columnFamilies, final ApplicationScope scope,
+ final MarkedEdge markedEdge, final UUID timestamp ) {
+ ValidationUtils.validateApplicationScope( scope );
+ GraphValidation.validateEdge( markedEdge );
+ ValidationUtils.verifyTimeUuid( timestamp, "timestamp" );
+
+
+ final MutationBatch batch = keyspace.prepareMutationBatch().withConsistencyLevel( cassandraConfig.getWriteCL() )
+ .withTimestamp( timestamp.timestamp() );
+
+
+ doWrite( columnFamilies, scope, markedEdge, new RowOp<RowKey>() {
+ @Override
+ public void writeEdge( final MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> columnFamily,
+ final RowKey rowKey, final DirectedEdge edge ) {
+ batch.withRow( columnFamily, ScopedRowKey.fromKey( scope, rowKey ) ).deleteColumn( edge );
+ }
+
+
+ @Override
+ public void countEdge( final Shard shard, final DirectedEdgeMeta directedEdgeMeta ) {
+ writeEdgeShardStrategy.increment( scope, shard, -1, directedEdgeMeta );
+ }
+
+
+ @Override
+ public void writeVersion( final MultiTennantColumnFamily<ApplicationScope, EdgeRowKey, Long> columnFamily,
+ final EdgeRowKey rowKey, final long timestamp ) {
+ batch.withRow( columnFamily, ScopedRowKey.fromKey( scope, rowKey ) ).deleteColumn( timestamp );
+ }
+ } );
+
+
+ return batch;
+ }
+
+
+ /**
+ * EdgeWrite the edges internally
+ *
+ * @param scope The scope to encapsulate
+ * @param edge The edge to write
+ * @param op The row operation to invoke
+ */
+ private void doWrite( final EdgeColumnFamilies columnFamilies, final ApplicationScope scope, final MarkedEdge edge,
+ final RowOp op ) {
+ ValidationUtils.validateApplicationScope( scope );
+ GraphValidation.validateEdge( edge );
+
+ final Id sourceNodeId = edge.getSourceNode();
+ final String sourceNodeType = sourceNodeId.getType();
+ final Id targetNodeId = edge.getTargetNode();
+ final String targetNodeType = targetNodeId.getType();
+ final long timestamp = edge.getTimestamp();
+ final String type = edge.getType();
+
+
+ /**
+ * Key in the serializers based on the edge
+ */
+
+
+ /**
+ * write edges from source->target
+ */
+
+
+ final long time = timeService.getCurrentTime();
+
+ final DirectedEdge sourceEdge = new DirectedEdge( targetNodeId, timestamp );
+
+ final DirectedEdgeMeta sourceEdgeMeta = DirectedEdgeMeta.fromSourceNode( sourceNodeId, type );
+
+
+
+ final ShardEntryGroup sourceRowKeyShard =
+ writeEdgeShardStrategy.getWriteShards( scope, timestamp, sourceEdgeMeta );
+
+ final MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> sourceCf =
+ columnFamilies.getSourceNodeCfName();
+
+
+ for ( Shard shard : sourceRowKeyShard.getWriteShards(time) ) {
+
+ final long shardId = shard.getShardIndex();
+ final RowKey sourceRowKey = new RowKey( sourceNodeId, type, shardId );
+ op.writeEdge( sourceCf, sourceRowKey, sourceEdge );
+ op.countEdge( shard, sourceEdgeMeta );
+ }
+
+
+
+ final DirectedEdgeMeta sourceEdgeTargetTypeMeta = DirectedEdgeMeta.fromSourceNodeTargetType( sourceNodeId,
+ type, targetNodeType );
+
+
+ final ShardEntryGroup sourceWithTypeRowKeyShard = writeEdgeShardStrategy
+ .getWriteShards( scope, timestamp, sourceEdgeTargetTypeMeta );
+
+ final MultiTennantColumnFamily<ApplicationScope, RowKeyType, DirectedEdge> targetCf =
+ columnFamilies.getSourceNodeTargetTypeCfName();
+
+ for ( Shard shard : sourceWithTypeRowKeyShard.getWriteShards(time) ) {
+
+ final long shardId = shard.getShardIndex();
+ final RowKeyType sourceRowKeyType = new RowKeyType( sourceNodeId, type, targetNodeId, shardId );
+
+ op.writeEdge( targetCf, sourceRowKeyType, sourceEdge );
+ op.countEdge( shard, sourceEdgeTargetTypeMeta );
+ }
+
+
+ /**
+ * write edges from target<-source
+ */
+
+ final DirectedEdge targetEdge = new DirectedEdge( sourceNodeId, timestamp );
+
+ final DirectedEdgeMeta targetEdgeMeta = DirectedEdgeMeta.fromTargetNode( targetNodeId, type );
+
+
+
+ final ShardEntryGroup targetRowKeyShard =
+ writeEdgeShardStrategy.getWriteShards( scope, timestamp, targetEdgeMeta );
+
+ final MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> sourceByTargetCf =
+ columnFamilies.getTargetNodeCfName();
+
+ for ( Shard shard : targetRowKeyShard.getWriteShards(time) ) {
+ final long shardId = shard.getShardIndex();
+ final RowKey targetRowKey = new RowKey( targetNodeId, type, shardId );
+
+ op.writeEdge( sourceByTargetCf, targetRowKey, targetEdge );
+ op.countEdge( shard, targetEdgeMeta );
+ }
+
+
+
+ final DirectedEdgeMeta targetEdgeSourceTypeMeta = DirectedEdgeMeta.fromTargetNodeSourceType( targetNodeId, type, sourceNodeType );
+
+
+ final ShardEntryGroup targetWithTypeRowKeyShard = writeEdgeShardStrategy
+ .getWriteShards( scope, timestamp, targetEdgeSourceTypeMeta );
+
+ final MultiTennantColumnFamily<ApplicationScope, RowKeyType, DirectedEdge> targetBySourceCf =
+ columnFamilies.getTargetNodeSourceTypeCfName();
+
+
+ for ( Shard shard : targetWithTypeRowKeyShard.getWriteShards(time) ) {
+
+ final long shardId = shard.getShardIndex();
+
+ final RowKeyType targetRowKeyType = new RowKeyType( targetNodeId, type, sourceNodeId, shardId );
+
+
+ op.writeEdge( targetBySourceCf, targetRowKeyType, targetEdge );
+ op.countEdge( shard, targetEdgeSourceTypeMeta );
+ }
+
+ /**
+ * Always a 0l shard, we're hard limiting 2b timestamps for the same edge
+ */
+ final EdgeRowKey edgeRowKey = new EdgeRowKey( sourceNodeId, type, targetNodeId, 0l );
+
+
+ /**
+ * Write this in the timestamp log for this edge of source->target
+ */
+ op.writeVersion( columnFamilies.getGraphEdgeVersions(), edgeRowKey, timestamp );
+ }
+
+
+ @Override
+ public Iterator<MarkedEdge> getEdgeVersions( final EdgeColumnFamilies columnFamilies, final ApplicationScope scope,
+ final SearchByEdge search, final Iterator<ShardEntryGroup> shards ) {
+ ValidationUtils.validateApplicationScope( scope );
+ GraphValidation.validateSearchByEdge( search );
+
+ final Id targetId = search.targetNode();
+ final Id sourceId = search.sourceNode();
+ final String type = search.getType();
+ final long maxTimestamp = search.getMaxTimestamp();
+ final MultiTennantColumnFamily<ApplicationScope, EdgeRowKey, Long> columnFamily =
+ columnFamilies.getGraphEdgeVersions();
+ final Serializer<Long> serializer = columnFamily.getColumnSerializer();
+
+ final EdgeSearcher<EdgeRowKey, Long, MarkedEdge> searcher =
+ new EdgeSearcher<EdgeRowKey, Long, MarkedEdge>( scope, maxTimestamp, search.last(), shards ) {
+
+ @Override
+ protected Serializer<Long> getSerializer() {
+ return serializer;
+ }
+
+
+ @Override
+ public void setRange( final RangeBuilder builder ) {
+
+
+ if ( last.isPresent() ) {
+ super.setRange( builder );
+ return;
+ }
+
+ //start seeking at a value < our max version
+ builder.setStart( maxTimestamp );
+ }
+
+
+ @Override
+ protected EdgeRowKey generateRowKey( long shard ) {
+ return new EdgeRowKey( sourceId, type, targetId, shard );
+ }
+
+
+ @Override
+ protected Long getStartColumn( final Edge last ) {
+ return last.getTimestamp();
+ }
+
+
+ @Override
+ protected MarkedEdge createEdge( final Long column, final boolean marked ) {
+ return new SimpleMarkedEdge( sourceId, type, targetId, column.longValue(), marked );
+ }
+
+
+ @Override
+ public int compare( final MarkedEdge o1, final MarkedEdge o2 ) {
+ return Long.compare( o1.getTimestamp(), o2.getTimestamp() );
+ }
+ };
+
+ return new ShardRowIterator<>( searcher, columnFamily, keyspace, cassandraConfig.getReadCL(),
+ graphFig.getScanPageSize() );
+ }
+
+
+ @Override
+ public Iterator<MarkedEdge> getEdgesFromSource( final EdgeColumnFamilies columnFamilies,
+ final ApplicationScope scope, final SearchByEdgeType edgeType,
+ final Iterator<ShardEntryGroup> shards ) {
+
+ ValidationUtils.validateApplicationScope( scope );
+ GraphValidation.validateSearchByEdgeType( edgeType );
+
+ final Id sourceId = edgeType.getNode();
+ final String type = edgeType.getType();
+ final long maxTimestamp = edgeType.getMaxTimestamp();
+ final MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> columnFamily =
+ columnFamilies.getSourceNodeCfName();
+ final Serializer<DirectedEdge> serializer = columnFamily.getColumnSerializer();
+
+
+ final SourceEdgeSearcher<RowKey, DirectedEdge, MarkedEdge> searcher =
+ new SourceEdgeSearcher<RowKey, DirectedEdge, MarkedEdge>( scope, maxTimestamp, edgeType.last(), shards ) {
+
+
+ @Override
+ protected Serializer<DirectedEdge> getSerializer() {
+ return serializer;
+ }
+
+
+ @Override
+ protected RowKey generateRowKey( long shard ) {
+ return new RowKey( sourceId, type, shard );
+ }
+
+
+ @Override
+ protected DirectedEdge getStartColumn( final Edge last ) {
+ return new DirectedEdge( last.getTargetNode(), last.getTimestamp() );
+ }
+
+
+ @Override
+ protected MarkedEdge createEdge( final DirectedEdge edge, final boolean marked ) {
+ return new SimpleMarkedEdge( sourceId, type, edge.id, edge.timestamp, marked );
+ }
+
+
+ };
+
+
+ return new ShardRowIterator<>( searcher, columnFamily, keyspace, cassandraConfig.getReadCL(),
+ graphFig.getScanPageSize() );
+ }
+
+
+ @Override
+ public Iterator<MarkedEdge> getEdgesFromSourceByTargetType( final EdgeColumnFamilies columnFamilies,
+ final ApplicationScope scope,
+ final SearchByIdType edgeType,
+ final Iterator<ShardEntryGroup> shards ) {
+
+ ValidationUtils.validateApplicationScope( scope );
+ GraphValidation.validateSearchByEdgeType( edgeType );
+
+ final Id targetId = edgeType.getNode();
+ final String type = edgeType.getType();
+ final String targetType = edgeType.getIdType();
+ final long maxTimestamp = edgeType.getMaxTimestamp();
+ final MultiTennantColumnFamily<ApplicationScope, RowKeyType, DirectedEdge> columnFamily =
+ columnFamilies.getSourceNodeTargetTypeCfName();
+ final Serializer<DirectedEdge> serializer = columnFamily.getColumnSerializer();
+
+
+ final SourceEdgeSearcher<RowKeyType, DirectedEdge, MarkedEdge> searcher =
+ new SourceEdgeSearcher<RowKeyType, DirectedEdge, MarkedEdge>( scope, maxTimestamp, edgeType.last(), shards ) {
+
+ @Override
+ protected Serializer<DirectedEdge> getSerializer() {
+ return serializer;
+ }
+
+
+ @Override
+ protected RowKeyType generateRowKey( long shard ) {
+ return new RowKeyType( targetId, type, targetType, shard );
+ }
+
+
+ @Override
+ protected DirectedEdge getStartColumn( final Edge last ) {
+ return new DirectedEdge( last.getTargetNode(), last.getTimestamp() );
+ }
+
+
+ @Override
+ protected MarkedEdge createEdge( final DirectedEdge edge, final boolean marked ) {
+ return new SimpleMarkedEdge( targetId, type, edge.id, edge.timestamp, marked );
+ }
+
+
+
+ };
+
+ return new ShardRowIterator( searcher, columnFamily, keyspace, cassandraConfig.getReadCL(),
+ graphFig.getScanPageSize() );
+ }
+
+
+
+
+ @Override
+ public Iterator<MarkedEdge> getEdgesToTarget( final EdgeColumnFamilies columnFamilies, final ApplicationScope scope,
+ final SearchByEdgeType edgeType,
+ final Iterator<ShardEntryGroup> shards ) {
+ ValidationUtils.validateApplicationScope( scope );
+ GraphValidation.validateSearchByEdgeType( edgeType );
+
+ final Id targetId = edgeType.getNode();
+ final String type = edgeType.getType();
+ final long maxTimestamp = edgeType.getMaxTimestamp();
+ final MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> columnFamily =
+ columnFamilies.getTargetNodeCfName();
+ final Serializer<DirectedEdge> serializer = columnFamily.getColumnSerializer();
+
+ final TargetEdgeSearcher<RowKey, DirectedEdge, MarkedEdge> searcher =
+ new TargetEdgeSearcher<RowKey, DirectedEdge, MarkedEdge>( scope, maxTimestamp, edgeType.last(), shards ) {
+
+ @Override
+ protected Serializer<DirectedEdge> getSerializer() {
+ return serializer;
+ }
+
+
+ @Override
+ protected RowKey generateRowKey( long shard ) {
+ return new RowKey( targetId, type, shard );
+ }
+
+
+ @Override
+ protected DirectedEdge getStartColumn( final Edge last ) {
+ return new DirectedEdge( last.getSourceNode(), last.getTimestamp() );
+ }
+
+
+ @Override
+ protected MarkedEdge createEdge( final DirectedEdge edge, final boolean marked ) {
+ return new SimpleMarkedEdge( edge.id, type, targetId, edge.timestamp, marked );
+ }
+ };
+
+
+ return new ShardRowIterator<>( searcher, columnFamily, keyspace, cassandraConfig.getReadCL(),
+ graphFig.getScanPageSize() );
+ }
+
+
+ @Override
+ public Iterator<MarkedEdge> getEdgesToTargetBySourceType( final EdgeColumnFamilies columnFamilies,
+ final ApplicationScope scope,
+ final SearchByIdType edgeType,
+ final Iterator<ShardEntryGroup> shards ) {
+
+ ValidationUtils.validateApplicationScope( scope );
+ GraphValidation.validateSearchByEdgeType( edgeType );
+
+ final Id targetId = edgeType.getNode();
+ final String sourceType = edgeType.getIdType();
+ final String type = edgeType.getType();
+ final long maxTimestamp = edgeType.getMaxTimestamp();
+ final MultiTennantColumnFamily<ApplicationScope, RowKeyType, DirectedEdge> columnFamily =
+ columnFamilies.getTargetNodeSourceTypeCfName();
+ final Serializer<DirectedEdge> serializer = columnFamily.getColumnSerializer();
+
+
+ final TargetEdgeSearcher<RowKeyType, DirectedEdge, MarkedEdge> searcher =
+ new TargetEdgeSearcher<RowKeyType, DirectedEdge, MarkedEdge>( scope, maxTimestamp, edgeType.last(), shards ) {
+ @Override
+ protected Serializer<DirectedEdge> getSerializer() {
+ return serializer;
+ }
+
+
+ @Override
+ protected RowKeyType generateRowKey( final long shard ) {
+ return new RowKeyType( targetId, type, sourceType, shard );
+ }
+
+
+ @Override
+ protected DirectedEdge getStartColumn( final Edge last ) {
+ return new DirectedEdge( last.getTargetNode(), last.getTimestamp() );
+ }
+
+
+ @Override
+ protected MarkedEdge createEdge( final DirectedEdge edge, final boolean marked ) {
+ return new SimpleMarkedEdge( edge.id, type, targetId, edge.timestamp, marked );
+ }
+ };
+
+ return new ShardRowIterator<>( searcher, columnFamily, keyspace, cassandraConfig.getReadCL(),
+ graphFig.getScanPageSize() );
+ }
+
+
+ /**
+ * Class for performing searched on rows based on source id
+ */
+ private static abstract class SourceEdgeSearcher<R, C, T extends Edge> extends EdgeSearcher<R, C, T>{
+
+ protected SourceEdgeSearcher( final ApplicationScope scope, final long maxTimestamp, final Optional<Edge> last,
+ final Iterator<ShardEntryGroup> shards ) {
+ super( scope, maxTimestamp, last, shards );
+ }
+
+
+ public int compare( final T o1, final T o2 ) {
+ int compare = Long.compare(o1.getTimestamp(), o2.getTimestamp());
+
+ if(compare == 0){
+ compare = o1.getTargetNode().compareTo( o2.getTargetNode());
+ }
+
+ return compare;
+ }
+
+
+ }
+
+
+ /**
+ * Class for performing searched on rows based on target id
+ */
+ private static abstract class TargetEdgeSearcher<R, C, T extends Edge> extends EdgeSearcher<R, C, T>{
+
+ protected TargetEdgeSearcher( final ApplicationScope scope, final long maxTimestamp, final Optional<Edge> last,
+ final Iterator<ShardEntryGroup> shards ) {
+ super( scope, maxTimestamp, last, shards );
+ }
+
+
+ public int compare( final T o1, final T o2 ) {
+ int compare = Long.compare(o1.getTimestamp(), o2.getTimestamp());
+
+ if(compare == 0){
+ compare = o1.getTargetNode().compareTo( o2.getTargetNode());
+ }
+
+ return compare;
+ }
+
+
+ }
+
+ /**
+ * Simple callback to perform puts and deletes with a common row setup code
+ *
+ * @param <R> The row key type
+ */
+ private static interface RowOp<R> {
+
+ /**
+ * Write the edge with the given data
+ */
+ void writeEdge( final MultiTennantColumnFamily<ApplicationScope, R, DirectedEdge> columnFamily, final R rowKey,
+ final DirectedEdge edge );
+
+ /**
+ * Perform the count on the edge
+ */
+ void countEdge( final Shard shard, final DirectedEdgeMeta directedEdgeMeta);
+
+ /**
+ * Write the edge into the version cf
+ */
+ void writeVersion( final MultiTennantColumnFamily<ApplicationScope, EdgeRowKey, Long> columnFamily,
+ final EdgeRowKey rowKey, long timestamp );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72684dfc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeColumnFamilies.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeColumnFamilies.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeColumnFamilies.java
new file mode 100644
index 0000000..9050b0a
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeColumnFamilies.java
@@ -0,0 +1,150 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ *
+ */
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
+
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.marshal.DynamicCompositeType;
+
+import org.apache.usergrid.persistence.core.astyanax.ColumnTypes;
+import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
+import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
+import org.apache.usergrid.persistence.core.astyanax.OrganizationScopedRowKeySerializer;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdge;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeColumnFamilies;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeRowKey;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.RowKey;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.RowKeyType;
+
+import com.netflix.astyanax.serializers.LongSerializer;
+
+import static org.apache.usergrid.persistence.core.astyanax.ColumnTypes.LONG_TYPE_REVERSED;
+import static org.apache.usergrid.persistence.core.astyanax.ColumnTypes.UUID_TYPE_REVERSED;
+
+
+/**
+ * Implementation of size based column family
+ */
+public class SizebasedEdgeColumnFamilies implements EdgeColumnFamilies {
+
+
+ //Row key with no type
+ private static final RowSerializer ROW_SERIALIZER = new RowSerializer();
+
+ //row key with target id type
+ private static final RowTypeSerializer ROW_TYPE_SERIALIZER = new RowTypeSerializer();
+
+ private static final EdgeRowKeySerializer EDGE_ROW_KEY_SERIALIZER = new EdgeRowKeySerializer();
+
+ //Edge serializers
+ private static final EdgeSerializer EDGE_SERIALIZER = new EdgeSerializer();
+
+ private static final LongSerializer LONG_SERIALIZER = LongSerializer.get();
+
+ private static final String EDGE_DYNAMIC_COMPOSITE_TYPE =
+ //we purposefully associate lower case "l" and "u" with reversed types. This way we can use
+ //the default serialization in Astayanax, but get reverse order in cassandra
+ DynamicCompositeType.class.getSimpleName() + "(s=>UTF8Type,l=>" + LONG_TYPE_REVERSED + ",u=>"
+ + UUID_TYPE_REVERSED + ")";
+
+
+ //initialize the CF's from our implementation
+ private static final MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> SOURCE_NODE_EDGES =
+ new MultiTennantColumnFamily<>( "Graph_Source_Node_Edges",
+ new OrganizationScopedRowKeySerializer<>( ROW_SERIALIZER ), EDGE_SERIALIZER );
+
+
+ private static final MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> TARGET_NODE_EDGES =
+ new MultiTennantColumnFamily<>( "Graph_Target_Node_Edges",
+ new OrganizationScopedRowKeySerializer<>( ROW_SERIALIZER ), EDGE_SERIALIZER );
+
+
+ private static final MultiTennantColumnFamily<ApplicationScope, RowKeyType, DirectedEdge> SOURCE_NODE_TARGET_TYPE =
+ new MultiTennantColumnFamily<>( "Graph_Source_Node_Target_Type",
+ new OrganizationScopedRowKeySerializer<>( ROW_TYPE_SERIALIZER ), EDGE_SERIALIZER );
+
+
+ /**
+ * The edges that are to the target node with the source type. The target node is the row key
+ */
+ private static final MultiTennantColumnFamily<ApplicationScope, RowKeyType, DirectedEdge> TARGET_NODE_SOURCE_TYPE =
+ new MultiTennantColumnFamily<>( "Graph_Target_Node_Source_Type",
+ new OrganizationScopedRowKeySerializer<>( ROW_TYPE_SERIALIZER ), EDGE_SERIALIZER );
+
+
+ private static final MultiTennantColumnFamily<ApplicationScope, EdgeRowKey, Long> EDGE_VERSIONS =
+ new MultiTennantColumnFamily<>( "Graph_Edge_Versions",
+ new OrganizationScopedRowKeySerializer<>( EDGE_ROW_KEY_SERIALIZER ), LONG_SERIALIZER );
+
+
+ @Override
+ public MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> getSourceNodeCfName() {
+ return SOURCE_NODE_EDGES;
+ }
+
+
+ @Override
+ public MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> getTargetNodeCfName() {
+ return TARGET_NODE_EDGES;
+ }
+
+
+ @Override
+ public MultiTennantColumnFamily<ApplicationScope, RowKeyType, DirectedEdge> getSourceNodeTargetTypeCfName() {
+ return SOURCE_NODE_TARGET_TYPE;
+ }
+
+
+ @Override
+ public MultiTennantColumnFamily<ApplicationScope, RowKeyType, DirectedEdge> getTargetNodeSourceTypeCfName() {
+ return TARGET_NODE_SOURCE_TYPE;
+ }
+
+
+ @Override
+ public MultiTennantColumnFamily<ApplicationScope, EdgeRowKey, Long> getGraphEdgeVersions() {
+ return EDGE_VERSIONS;
+ }
+
+
+ @Override
+ public Collection<MultiTennantColumnFamilyDefinition> getColumnFamilies() {
+ return Arrays
+ .asList( graphCf( SOURCE_NODE_EDGES ), graphCf( TARGET_NODE_EDGES ), graphCf( SOURCE_NODE_TARGET_TYPE ),
+ graphCf( TARGET_NODE_SOURCE_TYPE ),
+ new MultiTennantColumnFamilyDefinition( EDGE_VERSIONS, BytesType.class.getSimpleName(),
+ ColumnTypes.LONG_TYPE_REVERSED, BytesType.class.getSimpleName(),
+ MultiTennantColumnFamilyDefinition.CacheOption.KEYS ) );
+ }
+
+
+ /**
+ * Helper to generate an edge definition by the type
+ */
+ private MultiTennantColumnFamilyDefinition graphCf( MultiTennantColumnFamily cf ) {
+ return new MultiTennantColumnFamilyDefinition( cf, BytesType.class.getSimpleName(), EDGE_DYNAMIC_COMPOSITE_TYPE,
+ BytesType.class.getSimpleName(), MultiTennantColumnFamilyDefinition.CacheOption.KEYS );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72684dfc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeShardStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeShardStrategy.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeShardStrategy.java
index f246f23..8787d97 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeShardStrategy.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeShardStrategy.java
@@ -23,10 +23,12 @@ package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
import java.util.Iterator;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardStrategy;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardApproximation;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardCache;
-import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
import com.google.inject.Inject;
import com.google.inject.Singleton;
@@ -52,52 +54,21 @@ public class SizebasedEdgeShardStrategy implements EdgeShardStrategy {
@Override
- public long getWriteShard( final ApplicationScope scope, final Id rowKeyId, final long timestamp,
- final String... types ) {
- return shardCache.getSlice( scope, rowKeyId, timestamp, types );
+ public ShardEntryGroup getWriteShards( final ApplicationScope scope,
+ final long timestamp, final DirectedEdgeMeta directedEdgeMeta ) {
+ return shardCache.getWriteShardGroup( scope, timestamp, directedEdgeMeta);
}
@Override
- public Iterator<Long> getReadShards( final ApplicationScope scope, final Id rowKeyId, final long maxTimestamp,
- final String... types ) {
- return shardCache.getVersions( scope, rowKeyId, maxTimestamp, types );
+ public Iterator<ShardEntryGroup> getReadShards( final ApplicationScope scope, final long maxTimestamp, final DirectedEdgeMeta directedEdgeMeta ) {
+ return shardCache.getReadShardGroup( scope, maxTimestamp, directedEdgeMeta );
}
@Override
- public void increment( final ApplicationScope scope, final Id rowKeyId, final long shardId, final long count,
- final String... types ) {
- shardApproximation.increment( scope, rowKeyId, shardId, count, types );
- }
-
-
- @Override
- public String getSourceNodeCfName() {
- return "Graph_Source_Node_Edges";
- }
-
-
- @Override
- public String getTargetNodeCfName() {
- return "Graph_Target_Node_Edges";
- }
-
-
- @Override
- public String getSourceNodeTargetTypeCfName() {
- return "Graph_Source_Node_Target_Type";
- }
-
-
- @Override
- public String getTargetNodeSourceTypeCfName() {
- return "Graph_Target_Node_Source_Type";
- }
-
-
- @Override
- public String getGraphEdgeVersions() {
- return "Graph_Edge_Versions";
+ public void increment( final ApplicationScope scope, final Shard shard,
+ final long count, final DirectedEdgeMeta directedEdgeMeta) {
+ shardApproximation.increment( scope, shard, count, directedEdgeMeta );
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72684dfc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SourceEdgeSearcher.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SourceEdgeSearcher.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SourceEdgeSearcher.java
new file mode 100644
index 0000000..b33fcaf
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SourceEdgeSearcher.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
+
+
+public class SourceEdgeSearcher {}