You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2014/08/28 05:36:54 UTC
[3/5] Updated OrderedMerge to use a faster implementation at runtime.
After initialization,
it's an O(1) emit operation as long as our produces are fast enough.
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/RowTypeSerializer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/RowTypeSerializer.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/RowTypeSerializer.java
deleted file mode 100644
index 2edea56..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/RowTypeSerializer.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- *
- * * Licensed to the Apache Software Foundation (ASF) under one
- * * or more contributor license agreements. See the NOTICE file
- * * distributed with this work for additional information
- * * regarding copyright ownership. The ASF licenses this file
- * * to you under the Apache License, Version 2.0 (the
- * * "License"); you may not use this file except in compliance
- * * with the License. You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing,
- * * software distributed under the License is distributed on an
- * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * * KIND, either express or implied. See the License for the
- * * specific language governing permissions and limitations
- * * under the License.
- *
- */
-
-package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
-
-
-import org.apache.usergrid.persistence.core.astyanax.CompositeFieldSerializer;
-import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.RowKeyType;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.netflix.astyanax.model.CompositeBuilder;
-import com.netflix.astyanax.model.CompositeParser;
-
-
-public class RowTypeSerializer implements CompositeFieldSerializer<RowKeyType> {
-
- private static final IdRowCompositeSerializer ID_SER = IdRowCompositeSerializer.get();
-
-
- @Override
- public void toComposite( final CompositeBuilder builder, final RowKeyType keyType ) {
-
- //add the row id to the composite
- ID_SER.toComposite( builder, keyType.nodeId );
-
- builder.addString( keyType.edgeType );
- builder.addString( keyType.idType );
-
- builder.addLong( keyType.shardId );
- }
-
-
- @Override
- public RowKeyType fromComposite( final CompositeParser composite ) {
-
- final Id id = ID_SER.fromComposite( composite );
- final String edgeType = composite.readString();
- final String idType = composite.readString();
- final long shard = composite.readLong();
-
- return new RowKeyType( id, edgeType, idType, shard);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java
index c8a884b..f1b5108 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java
@@ -6,32 +6,51 @@ import java.util.NoSuchElementException;
import org.apache.commons.collections4.iterators.PushbackIterator;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardGroupCompaction;
import com.google.common.base.Preconditions;
+import rx.schedulers.Schedulers;
+
/**
- * Utility class that will take an iterator of all shards, and combine them into an iterator
- * of ShardEntryGroups. These groups can then be used in a distributed system to handle concurrent reads and writes
+ * Utility class that will take an iterator of all shards, and combine them into an iterator of ShardEntryGroups. These
+ * groups can then be used in a distributed system to handle concurrent reads and writes
*/
public class ShardEntryGroupIterator implements Iterator<ShardEntryGroup> {
- private ShardEntryGroup next;
+ private final ShardGroupCompaction shardGroupCompaction;
private final PushbackIterator<Shard> sourceIterator;
private final long minDelta;
+ private final ApplicationScope scope;
+ private final DirectedEdgeMeta directedEdgeMeta;
+
+
+ private ShardEntryGroup next;
/**
* Create a shard iterator
- * @param shardIterator The iterator of all shards. Order is expected to be by the shard index from Long.MAX to Long.MIN
+ *
+ * @param shardIterator The iterator of all shards. Order is expected to be by the shard index from Long.MAX to
+ * Long.MIN
* @param minDelta The minimum delta we allow to consider shards the same group
*/
- public ShardEntryGroupIterator( final Iterator<Shard> shardIterator, final long minDelta ) {
- Preconditions.checkArgument(shardIterator.hasNext(), "Shard iterator must have shards present");
+ public ShardEntryGroupIterator( final Iterator<Shard> shardIterator, final long minDelta,
+ final ShardGroupCompaction shardGroupCompaction, final ApplicationScope scope,
+ final DirectedEdgeMeta directedEdgeMeta ) {
+
+
+ Preconditions.checkArgument( shardIterator.hasNext(), "Shard iterator must have shards present" );
+ this.scope = scope;
+ this.directedEdgeMeta = directedEdgeMeta;
this.sourceIterator = new PushbackIterator( shardIterator );
+ this.shardGroupCompaction = shardGroupCompaction;
this.minDelta = minDelta;
}
@@ -78,7 +97,7 @@ public class ShardEntryGroupIterator implements Iterator<ShardEntryGroup> {
*/
while ( sourceIterator.hasNext() ) {
- if(next == null){
+ if ( next == null ) {
next = new ShardEntryGroup( minDelta );
}
@@ -92,9 +111,13 @@ public class ShardEntryGroupIterator implements Iterator<ShardEntryGroup> {
sourceIterator.pushback( shard );
+
break;
}
-
+ //now perform the audit (maybe)
+ if(next != null) {
+ shardGroupCompaction.evaluateShardGroup( scope, directedEdgeMeta, next );
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
index c566d43..5076424 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
@@ -23,21 +23,32 @@ package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
import java.nio.charset.Charset;
-import java.util.BitSet;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
-import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.annotation.Nullable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.usergrid.persistence.core.consistency.TimeService;
-import org.apache.usergrid.persistence.core.rx.ObservableIterator;
+import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.GraphFig;
import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeColumnFamilies;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardSerialization;
@@ -47,20 +58,22 @@ import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntry
import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardGroupCompaction;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardedEdgeSerialization;
import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
import com.google.common.base.Preconditions;
-import com.google.common.hash.HashCode;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
+import com.google.common.hash.PrimitiveSink;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import com.google.inject.Singleton;
-
-import rx.Observable;
-import rx.functions.Action0;
-import rx.functions.Action1;
-import rx.functions.Func1;
-import rx.schedulers.Schedulers;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
/**
@@ -70,27 +83,33 @@ import rx.schedulers.Schedulers;
public class ShardGroupCompactionImpl implements ShardGroupCompaction {
+ private static final Logger LOG = LoggerFactory.getLogger( ShardGroupCompactionImpl.class );
+
+
private static final Charset CHARSET = Charset.forName( "UTF-8" );
private static final HashFunction MURMUR_128 = Hashing.murmur3_128();
+
+ private final ListeningExecutorService executorService;
private final TimeService timeService;
private final GraphFig graphFig;
private final NodeShardAllocation nodeShardAllocation;
private final ShardedEdgeSerialization shardedEdgeSerialization;
private final EdgeColumnFamilies edgeColumnFamilies;
+ private final Keyspace keyspace;
private final EdgeShardSerialization edgeShardSerialization;
-
private final Random random;
private final ShardCompactionTaskTracker shardCompactionTaskTracker;
+ private final ShardAuditTaskTracker shardAuditTaskTracker;
@Inject
public ShardGroupCompactionImpl( final TimeService timeService, final GraphFig graphFig,
final NodeShardAllocation nodeShardAllocation,
final ShardedEdgeSerialization shardedEdgeSerialization,
- final EdgeColumnFamilies edgeColumnFamilies,
+ final EdgeColumnFamilies edgeColumnFamilies, final Keyspace keyspace,
final EdgeShardSerialization edgeShardSerialization ) {
this.timeService = timeService;
@@ -98,157 +117,343 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
this.nodeShardAllocation = nodeShardAllocation;
this.shardedEdgeSerialization = shardedEdgeSerialization;
this.edgeColumnFamilies = edgeColumnFamilies;
+ this.keyspace = keyspace;
this.edgeShardSerialization = edgeShardSerialization;
this.random = new Random();
this.shardCompactionTaskTracker = new ShardCompactionTaskTracker();
+ this.shardAuditTaskTracker = new ShardAuditTaskTracker();
+
+ executorService = MoreExecutors.listeningDecorator(
+ new MaxSizeThreadPool( graphFig.getShardAuditWorkerCount(), graphFig.getShardAuditWorkerQueueSize() ) );
}
- @Override
- public Set<Shard> compact( final ApplicationScope scope, final DirectedEdgeMeta edgeMeta,
- final ShardEntryGroup group ) {
+ /**
+ * Execute the compaction task. Will return the status the operations performed
+ *
+ * @param group The shard entry group to compact
+ *
+ * @return The result of the compaction operation
+ */
+ public CompactionResult compact( final ApplicationScope scope, final DirectedEdgeMeta edgeMeta,
+ final ShardEntryGroup group ) {
+
+
final long startTime = timeService.getCurrentTime();
+
Preconditions.checkNotNull( group, "group cannot be null" );
Preconditions.checkArgument( group.isCompactionPending(), "Compaction is pending" );
- Preconditions.checkArgument( group.shouldCompact( startTime ), "Compaction can now be run" );
+ Preconditions.checkArgument( group.shouldCompact( startTime ),
+ "Compaction cannot be run yet. Ignoring compaction." );
- /**
- * It's already compacting, don't do anything
- */
- if (!shardCompactionTaskTracker.shouldStartCompaction( scope, edgeMeta, group )){
- return Collections.emptySet();
- }
+ final CompactionResult.CompactionBuilder resultBuilder = CompactionResult.builder();
final Shard targetShard = group.getCompactionTarget();
- final Collection<Shard> sourceShards = group.getReadShards();
+ final Set<Shard> sourceShards = new HashSet<>( group.getReadShards() );
+ //remove the target
+ sourceShards.remove( targetShard );
- Observable.create( new ObservableIterator<MarkedEdge>( "Shard_Repair" ) {
- @Override
- protected Iterator<MarkedEdge> getIterator() {
- return edgeMeta.loadEdges( shardedEdgeSerialization, edgeColumnFamilies, scope, group.getReadShards(), Long.MAX_VALUE );
- }
- } ).buffer( graphFig.getScanPageSize() ).doOnNext( new Action1<List<MarkedEdge>>() {
- @Override
- public void call( final List<MarkedEdge> markedEdges ) {
+ final UUID timestamp = UUIDGenerator.newTimeUUID();
- }
- }).doOnNext( new Action1<List<MarkedEdge>>() {
- @Override
- public void call( final List<MarkedEdge> markedEdges ) {
+ final long newShardPivot = targetShard.getShardIndex();
- }
- } );
+ final int maxWorkSize = graphFig.getScanPageSize();
+ final MutationBatch newRowBatch = keyspace.prepareMutationBatch();
+ final MutationBatch deleteRowBatch = keyspace.prepareMutationBatch();
+ /**
+ * As we move edges, we want to keep track of it
+ */
+ long edgeCount = 0;
- return null;
- }
+ for ( Shard sourceShard : sourceShards ) {
+ Iterator<MarkedEdge> edges = edgeMeta.loadEdges( shardedEdgeSerialization, edgeColumnFamilies, scope,
+ Collections.singleton( sourceShard ), Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING );
+ while ( edges.hasNext() ) {
+ final MarkedEdge edge = edges.next();
- @Override
- public AuditResult evaluateShardGroup( final ApplicationScope scope, final DirectedEdgeMeta edgeMeta,
- final ShardEntryGroup group ) {
+ final long edgeTimestamp = edge.getTimestamp();
+ /**
+ * The edge is within a different shard, break
+ */
+ if ( edgeTimestamp < newShardPivot ) {
+ break;
+ }
- final double repairChance = random.nextDouble();
- //don't repair
- if ( repairChance > graphFig.getShardRepairChance() ) {
- return AuditResult.NOT_CHECKED;
+ newRowBatch.mergeShallow(
+ edgeMeta.writeEdge( shardedEdgeSerialization, edgeColumnFamilies, scope, targetShard, edge,
+ timestamp ) );
+
+ deleteRowBatch.mergeShallow(
+ edgeMeta.deleteEdge( shardedEdgeSerialization, edgeColumnFamilies, scope, sourceShard, edge,
+ timestamp ) );
+
+ edgeCount++;
+
+ //if we're at our count, execute the mutation of writing the edges to the new row, then remove them
+ //from the old rows
+ if ( edgeCount % maxWorkSize == 0 ) {
+
+ try {
+ HystrixCassandra.async( newRowBatch );
+ HystrixCassandra.async( deleteRowBatch );
+ }
+ catch ( Throwable t ) {
+ LOG.error( "Unable to move edges from shard {} to shard {}", sourceShard, targetShard );
+ }
+ }
+ }
}
+ try {
+ HystrixCassandra.async( newRowBatch );
+ HystrixCassandra.async( deleteRowBatch );
+ }
+ catch ( Throwable t ) {
+ LOG.error( "Unable to move edges to target shard {}", targetShard );
+ }
+
+
+ LOG.info( "Finished compacting {} shards and moved {} edges", sourceShards, edgeCount );
+
+ resultBuilder.withCopiedEdges( edgeCount ).withSourceShards( sourceShards ).withTargetShard( targetShard );
+
/**
- * We don't have a compaction pending. Run an audit on the shards
+ * We didn't move anything this pass, mark the shard as compacted. If we move something, it means that we missed it on the first pass
+ * or someone is still not writing to the target shard only.
*/
- if ( !group.isCompactionPending() ) {
+ if ( edgeCount == 0 ) {
- /**
- * Check if we should allocate, we may want to
- */
+ //now that we've marked our target as compacted, we can successfully remove any shards that are not
+ // compacted themselves in the sources
+
+ final MutationBatch shardRemovalRollup = keyspace.prepareMutationBatch();
+
+ for ( Shard source : sourceShards ) {
+
+ //if we can't safely delete it, don't do so
+ if ( !group.canBeDeleted( source ) ) {
+ continue;
+ }
- final boolean created = nodeShardAllocation.auditShard( scope, group, edgeMeta );
+ LOG.info( "Source shards have been fully drained. Removing shard {}", source );
+ final MutationBatch shardRemoval = edgeShardSerialization.removeShardMeta( scope, source, edgeMeta );
+ shardRemovalRollup.mergeShallow( shardRemoval );
- if ( !created ) {
- return AuditResult.CHECKED_NO_OP;
+ resultBuilder.withRemovedShard( source );
}
- return AuditResult.CHECKED_CREATED;
+ HystrixCassandra.async( shardRemovalRollup );
+
+
+ LOG.info( "Shard has been fully compacted. Marking shard {} as compacted in Cassandra", targetShard );
+
+ //Overwrite our shard index with a newly created one that has been marked as compacted
+ Shard compactedShard = new Shard( targetShard.getShardIndex(), timeService.getCurrentTime(), true );
+ final MutationBatch updateMark = edgeShardSerialization.writeShardMeta( scope, compactedShard, edgeMeta );
+ HystrixCassandra.async( updateMark );
+
+ resultBuilder.withCompactedShard( compactedShard );
}
- //check our taskmanager
+ return resultBuilder.build();
+ }
+ @Override
+ public ListenableFuture<AuditResult> evaluateShardGroup( final ApplicationScope scope,
+ final DirectedEdgeMeta edgeMeta,
+ final ShardEntryGroup group ) {
+
+ final double repairChance = random.nextDouble();
+
+
+ //don't audit, we didn't hit our chance
+ if ( repairChance > graphFig.getShardRepairChance() ) {
+ return Futures.immediateFuture( AuditResult.NOT_CHECKED );
+ }
+
/**
- * Do the compaction
+ * Try and submit. During back pressure, we may not be able to submit, that's ok. Better to drop than to
+ * hose the system
*/
- if ( group.shouldCompact( timeService.getCurrentTime() ) ) {
- compact( scope, edgeMeta, group );
- return AuditResult.COMPACTING;
- }
+ ListenableFuture<AuditResult> future = executorService.submit( new Callable<AuditResult>() {
+ @Override
+ public AuditResult call() throws Exception {
+
+
+ /**
+ * We don't have a compaction pending. Run an audit on the shards
+ */
+ if ( !group.isCompactionPending() ) {
+
+ /**
+ * Check if we should allocate, we may want to
+ */
+
+ /**
+ * It's already compacting, don't do anything
+ */
+ if ( !shardAuditTaskTracker.canStartTask( scope, edgeMeta, group ) ) {
+ return AuditResult.CHECKED_NO_OP;
+ }
+
+ try {
+
+ final boolean created = nodeShardAllocation.auditShard( scope, group, edgeMeta );
+ if ( !created ) {
+ return AuditResult.CHECKED_NO_OP;
+ }
+ }
+ finally {
+ shardAuditTaskTracker.complete( scope, edgeMeta, group );
+ }
+
+
+ return AuditResult.CHECKED_CREATED;
+ }
+
+ //check our taskmanager
+
+
+ /**
+ * Do the compaction
+ */
+ if ( group.shouldCompact( timeService.getCurrentTime() ) ) {
+ /**
+ * It's already compacting, don't do anything
+ */
+ if ( !shardCompactionTaskTracker.canStartTask( scope, edgeMeta, group ) ) {
+ return AuditResult.COMPACTING;
+ }
+
+ /**
+ * We use a finally b/c we always want to remove the task track
+ */
+ try {
+ CompactionResult result = compact( scope, edgeMeta, group );
+ LOG.info( "Compaction result for compaction of scope {} with edge meta data of {} and shard group {} is {}", new Object[]{scope, edgeMeta, group, result} );
+ }
+ finally {
+ shardCompactionTaskTracker.complete( scope, edgeMeta, group );
+ }
+ return AuditResult.COMPACTED;
+ }
+
+ //no op, there's nothing we need to do to this shard
+ return AuditResult.NOT_CHECKED;
+ }
+ } );
+
+ /**
+ * Log our success or failures for debugging purposes
+ */
+ Futures.addCallback( future, new FutureCallback<AuditResult>() {
+ @Override
+ public void onSuccess( @Nullable final AuditResult result ) {
+ LOG.debug( "Successfully completed audit of task {}", result );
+ }
- //no op, there's nothing we need to do to this shard
- return AuditResult.NOT_CHECKED;
+
+ @Override
+ public void onFailure( final Throwable t ) {
+ LOG.error( "Unable to perform audit. Exception is ", t );
+ }
+ } );
+
+ return future;
}
- private static final class ShardCompactionTaskTracker {
- private BitSet runningTasks = new BitSet();
+ /**
+ * Inner class used to track running tasks per instance
+ */
+ private static abstract class TaskTracker {
+
+ private static final Boolean TRUE = true;
+
+ private ConcurrentHashMap<Long, Boolean> runningTasks = new ConcurrentHashMap<>();
/**
* Sets this data into our scope to signal it's running to stop other threads from attempting to run
- * @param scope
- * @param edgeMeta
- * @param group
- * @return
*/
- public boolean shouldStartCompaction( final ApplicationScope scope, final DirectedEdgeMeta edgeMeta,
+ public boolean canStartTask( final ApplicationScope scope, final DirectedEdgeMeta edgeMeta,
ShardEntryGroup group ) {
- final int hash = doHash( scope, edgeMeta, group ).asInt();
+ final Long hash = doHash( scope, edgeMeta, group ).hash().asLong();
- if(runningTasks.get( hash )){
- return false;
- }
-
- runningTasks.set( hash );
+ final Boolean returned = runningTasks.putIfAbsent( hash, TRUE );
- return true;
+ /**
+ * Someone already put the value
+ */
+ return returned == null;
}
/**
* Mark this entry group as complete
- * @param scope
- * @param edgeMeta
- * @param group
*/
- public void complete( final ApplicationScope scope, final DirectedEdgeMeta edgeMeta,
- ShardEntryGroup group ) {
- final int hash = doHash( scope, edgeMeta, group ).asInt();
- runningTasks.clear( hash );
+ public void complete( final ApplicationScope scope, final DirectedEdgeMeta edgeMeta, ShardEntryGroup group ) {
+ final long hash = doHash( scope, edgeMeta, group ).hash().asLong();
+ runningTasks.remove( hash );
}
+ protected abstract Hasher doHash( final ApplicationScope scope, final DirectedEdgeMeta directedEdgeMeta,
+ final ShardEntryGroup shardEntryGroup );
+ }
+
+
+ /**
+ * Task tracker for shard compaction
+ */
+ private static final class ShardCompactionTaskTracker extends ShardAuditTaskTracker {
+
/**
* Hash our data into a consistent long
- * @param scope
- * @param directedEdgeMeta
- * @param shardEntryGroup
- * @return
*/
- private HashCode doHash( final ApplicationScope scope, final DirectedEdgeMeta directedEdgeMeta,
+ protected Hasher doHash( final ApplicationScope scope, final DirectedEdgeMeta directedEdgeMeta,
+ final ShardEntryGroup shardEntryGroup ) {
+
+ final Hasher hasher = super.doHash( scope, directedEdgeMeta, shardEntryGroup );
+
+ //add our compaction target to the hash
+ final Shard compactionTarget = shardEntryGroup.getCompactionTarget();
+
+ hasher.putLong( compactionTarget.getShardIndex() );
+
+
+ return hasher;
+ }
+ }
+
+
+ /**
+ * Task tracker for shard audit
+ */
+ private static class ShardAuditTaskTracker extends TaskTracker {
+
+ /**
+ * Hash our data into a consistent long
+ */
+ protected Hasher doHash( final ApplicationScope scope, final DirectedEdgeMeta directedEdgeMeta,
final ShardEntryGroup shardEntryGroup ) {
final Hasher hasher = MURMUR_128.newHasher();
@@ -273,35 +478,141 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
}
//add our compaction target to the hash
- final Shard compactionTarget = shardEntryGroup.getCompactionTarget();
- hasher.putLong( compactionTarget.getShardIndex() );
-
- return hasher.hash();
+ return hasher;
}
- private void addToHash( final Hasher hasher, final Id id ) {
+ protected void addToHash( final PrimitiveSink into, final Id id ) {
final UUID nodeUuid = id.getUuid();
final String nodeType = id.getType();
- hasher.putLong( nodeUuid.getMostSignificantBits() ).putLong( nodeUuid.getLeastSignificantBits() )
- .putString( nodeType, CHARSET );
+ into.putLong( nodeUuid.getMostSignificantBits() ).putLong( nodeUuid.getLeastSignificantBits() )
+ .putString( nodeType, CHARSET );
+ }
+ }
+
+
+ /**
+ * Create a thread pool that will reject work if our audit tasks become overwhelmed
+ */
+ private final class MaxSizeThreadPool extends ThreadPoolExecutor {
+
+ public MaxSizeThreadPool( final int workerSize, final int queueLength ) {
+ super( 1, workerSize, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>( queueLength ),
+ new CompactionThreadFactory(), new RejectionLogger() );
+ }
+ }
+
+
+ private final class CompactionThreadFactory implements ThreadFactory {
+
+ private final AtomicLong threadCounter = new AtomicLong();
+
+
+ @Override
+ public Thread newThread( final Runnable r ) {
+ final long newValue = threadCounter.incrementAndGet();
+
+ return new Thread( r, "Graph-Shard-Compaction-" + newValue );
}
}
- private enum StartResult{
- /**
- * Returned if the compaction was started
- */
- STARTED,
+ private final class RejectionLogger implements RejectedExecutionHandler {
+
+
+ @Override
+ public void rejectedExecution( final Runnable r, final ThreadPoolExecutor executor ) {
+ LOG.warn( "Audit queue full, rejecting audit task {}", r );
+ }
+ }
+
+
+
+ public static final class CompactionResult {
+
+ public final long copiedEdges;
+ public final Shard targetShard;
+ public final Set<Shard> sourceShards;
+ public final Set<Shard> removedShards;
+ public final Shard compactedShard;
+
+
+
+ private CompactionResult( final long copiedEdges, final Shard targetShard, final Set<Shard> sourceShards,
+ final Set<Shard> removedShards, final Shard compactedShard ) {
+ this.copiedEdges = copiedEdges;
+ this.targetShard = targetShard;
+ this.compactedShard = compactedShard;
+ this.sourceShards = Collections.unmodifiableSet( sourceShards );
+ this.removedShards = Collections.unmodifiableSet( removedShards );
+ }
+
/**
- * Returned if we are running the compaction
+ * Create a builder to use to create the result
*/
- RUNNING;
+ public static CompactionBuilder builder() {
+ return new CompactionBuilder();
+ }
+
+
+ @Override
+ public String toString() {
+ return "CompactionResult{" +
+ "copiedEdges=" + copiedEdges +
+ ", targetShard=" + targetShard +
+ ", sourceShards=" + sourceShards +
+ ", removedShards=" + removedShards +
+ ", compactedShard=" + compactedShard +
+ '}';
+ }
+
+
+ public static final class CompactionBuilder {
+ private long copiedEdges;
+ private Shard targetShard;
+ private Set<Shard> sourceShards;
+ private Set<Shard> removedShards = new HashSet<>();
+ private Shard compactedShard;
+
+
+ public CompactionBuilder withCopiedEdges( final long copiedEdges ) {
+ this.copiedEdges = copiedEdges;
+ return this;
+ }
+
+
+ public CompactionBuilder withTargetShard( final Shard targetShard ) {
+ this.targetShard = targetShard;
+ return this;
+ }
+
+
+ public CompactionBuilder withSourceShards( final Set<Shard> sourceShards ) {
+ this.sourceShards = sourceShards;
+ return this;
+ }
+
+
+ public CompactionBuilder withRemovedShard( final Shard removedShard ) {
+ this.removedShards.add( removedShard );
+ return this;
+ }
+
+
+ public CompactionBuilder withCompactedShard( final Shard compactedShard ) {
+ this.compactedShard = compactedShard;
+ return this;
+ }
+
+
+ public CompactionResult build() {
+ return new CompactionResult( copiedEdges, targetShard, sourceShards, removedShards, compactedShard );
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
index 030e4a7..b0523e1 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
@@ -22,9 +22,11 @@ package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
import java.util.Collection;
+import java.util.Comparator;
import java.util.Iterator;
import java.util.UUID;
+import javax.annotation.Nullable;
import javax.inject.Inject;
import org.apache.usergrid.persistence.core.astyanax.CassandraConfig;
@@ -49,9 +51,16 @@ import org.apache.usergrid.persistence.graph.serialization.impl.shard.RowKey;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.RowKeyType;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardedEdgeSerialization;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.comparators.DescendingTimestampComparator;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.comparators.OrderedComparator;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.comparators
+ .SourceDirectedEdgeDescendingComparator;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.comparators
+ .TargetDirectedEdgeDescendingComparator;
import org.apache.usergrid.persistence.graph.serialization.util.GraphValidation;
import org.apache.usergrid.persistence.model.entity.Id;
+import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.inject.Singleton;
import com.netflix.astyanax.Keyspace;
@@ -256,6 +265,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
final Shard shard, final boolean isDeleted ) {
batch.withRow( columnFamily, ScopedRowKey.fromKey( scope, rowKey ) ).deleteColumn( edge );
+ writeEdgeShardStrategy.increment( scope, shard, -1, directedEdgeMeta );
}
}.createBatch( scope, shards, timestamp );
}
@@ -278,6 +288,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
batch.withRow( columnFamilies.getSourceNodeTargetTypeCfName(), ScopedRowKey.fromKey( scope, rowKey ) )
.deleteColumn( edge );
+ writeEdgeShardStrategy.increment( scope, shard, -1, directedEdgeMeta );
}
}.createBatch( scope, shards, timestamp );
}
@@ -297,6 +308,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
final Shard shard, final boolean isDeleted ) {
batch.withRow( columnFamily, ScopedRowKey.fromKey( scope, rowKey ) ).deleteColumn( edge );
+ writeEdgeShardStrategy.increment( scope, shard, -1, directedEdgeMeta );
}
}.createBatch( scope, shards, timestamp );
}
@@ -319,6 +331,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
batch.withRow( columnFamilies.getTargetNodeSourceTypeCfName(), ScopedRowKey.fromKey( scope, rowKey ) )
.deleteColumn( edge );
+ writeEdgeShardStrategy.increment( scope, shard, -1, directedEdgeMeta );
}
}.createBatch( scope, shards, timestamp );
}
@@ -338,6 +351,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
final boolean isDeleted ) {
batch.withRow( columnFamilies.getGraphEdgeVersions(), ScopedRowKey.fromKey( scope, rowKey ) )
.deleteColumn( column );
+ writeEdgeShardStrategy.increment( scope, shard, -1, directedEdgeMeta );
}
}.createBatch( scope, shards, timestamp );
}
@@ -357,8 +371,16 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
columnFamilies.getGraphEdgeVersions();
final Serializer<Long> serializer = columnFamily.getColumnSerializer();
+
+ final OrderedComparator<MarkedEdge> comparator = new OrderedComparator<>( DescendingTimestampComparator.INSTANCE, search.getOrder());
+
+
+
+
final EdgeSearcher<EdgeRowKey, Long, MarkedEdge> searcher =
- new EdgeSearcher<EdgeRowKey, Long, MarkedEdge>( scope, maxTimestamp, search.last(), shards ) {
+ new EdgeSearcher<EdgeRowKey, Long, MarkedEdge>( scope, shards, search.getOrder(), comparator, maxTimestamp,
+ search.last().transform( TRANSFORM ) ) {
+
@Override
protected Serializer<Long> getSerializer() {
@@ -367,11 +389,11 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
@Override
- public void setRange( final RangeBuilder builder ) {
+ public void buildRange( final RangeBuilder builder ) {
if ( last.isPresent() ) {
- super.setRange( builder );
+ super.buildRange( builder );
return;
}
@@ -387,7 +409,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
@Override
- protected Long getStartColumn( final Edge last ) {
+ protected Long createColumn( final MarkedEdge last ) {
return last.getTimestamp();
}
@@ -398,10 +420,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
}
- @Override
- public int compare( final MarkedEdge o1, final MarkedEdge o2 ) {
- return Long.compare( o1.getTimestamp(), o2.getTimestamp() );
- }
+
};
return new ShardsColumnIterator<>( searcher, columnFamily, keyspace, cassandraConfig.getReadCL(),
@@ -411,23 +430,27 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
@Override
public Iterator<MarkedEdge> getEdgesFromSource( final EdgeColumnFamilies columnFamilies,
- final ApplicationScope scope, final SearchByEdgeType edgeType,
+ final ApplicationScope scope, final SearchByEdgeType search,
final Collection<Shard> shards ) {
ValidationUtils.validateApplicationScope( scope );
- GraphValidation.validateSearchByEdgeType( edgeType );
+ GraphValidation.validateSearchByEdgeType( search );
- final Id sourceId = edgeType.getNode();
- final String type = edgeType.getType();
- final long maxTimestamp = edgeType.getMaxTimestamp();
+ final Id sourceId = search.getNode();
+ final String type = search.getType();
+ final long maxTimestamp = search.getMaxTimestamp();
final MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> columnFamily =
columnFamilies.getSourceNodeCfName();
final Serializer<DirectedEdge> serializer = columnFamily.getColumnSerializer();
- final SourceEdgeSearcher<RowKey, DirectedEdge, MarkedEdge> searcher =
- new SourceEdgeSearcher<RowKey, DirectedEdge, MarkedEdge>( scope, maxTimestamp, edgeType.last(),
- shards ) {
+ final OrderedComparator<MarkedEdge> comparator = new OrderedComparator<>( TargetDirectedEdgeDescendingComparator.INSTANCE, search.getOrder());
+
+
+
+ final EdgeSearcher<RowKey, DirectedEdge, MarkedEdge> searcher =
+ new EdgeSearcher<RowKey, DirectedEdge, MarkedEdge>( scope, shards, search.getOrder(), comparator, maxTimestamp,
+ search.last().transform( TRANSFORM ) ) {
@Override
@@ -443,7 +466,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
@Override
- protected DirectedEdge getStartColumn( final Edge last ) {
+ protected DirectedEdge createColumn( final MarkedEdge last ) {
return new DirectedEdge( last.getTargetNode(), last.getTimestamp() );
}
@@ -463,24 +486,26 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
@Override
public Iterator<MarkedEdge> getEdgesFromSourceByTargetType( final EdgeColumnFamilies columnFamilies,
final ApplicationScope scope,
- final SearchByIdType edgeType,
+ final SearchByIdType search,
final Collection<Shard> shards ) {
ValidationUtils.validateApplicationScope( scope );
- GraphValidation.validateSearchByEdgeType( edgeType );
+ GraphValidation.validateSearchByEdgeType( search );
- final Id targetId = edgeType.getNode();
- final String type = edgeType.getType();
- final String targetType = edgeType.getIdType();
- final long maxTimestamp = edgeType.getMaxTimestamp();
+ final Id targetId = search.getNode();
+ final String type = search.getType();
+ final String targetType = search.getIdType();
+ final long maxTimestamp = search.getMaxTimestamp();
final MultiTennantColumnFamily<ApplicationScope, RowKeyType, DirectedEdge> columnFamily =
columnFamilies.getSourceNodeTargetTypeCfName();
final Serializer<DirectedEdge> serializer = columnFamily.getColumnSerializer();
+ final OrderedComparator<MarkedEdge> comparator = new OrderedComparator<>( TargetDirectedEdgeDescendingComparator.INSTANCE, search.getOrder());
+
- final SourceEdgeSearcher<RowKeyType, DirectedEdge, MarkedEdge> searcher =
- new SourceEdgeSearcher<RowKeyType, DirectedEdge, MarkedEdge>( scope, maxTimestamp, edgeType.last(),
- shards ) {
+ final EdgeSearcher<RowKeyType, DirectedEdge, MarkedEdge> searcher =
+ new EdgeSearcher<RowKeyType, DirectedEdge, MarkedEdge>( scope, shards, search.getOrder(), comparator, maxTimestamp,
+ search.last().transform( TRANSFORM ) ) {
@Override
protected Serializer<DirectedEdge> getSerializer() {
@@ -495,7 +520,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
@Override
- protected DirectedEdge getStartColumn( final Edge last ) {
+ protected DirectedEdge createColumn( final MarkedEdge last ) {
return new DirectedEdge( last.getTargetNode(), last.getTimestamp() );
}
@@ -513,20 +538,22 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
@Override
public Iterator<MarkedEdge> getEdgesToTarget( final EdgeColumnFamilies columnFamilies, final ApplicationScope scope,
- final SearchByEdgeType edgeType, final Collection<Shard> shards ) {
+ final SearchByEdgeType search, final Collection<Shard> shards ) {
ValidationUtils.validateApplicationScope( scope );
- GraphValidation.validateSearchByEdgeType( edgeType );
+ GraphValidation.validateSearchByEdgeType( search );
- final Id targetId = edgeType.getNode();
- final String type = edgeType.getType();
- final long maxTimestamp = edgeType.getMaxTimestamp();
+ final Id targetId = search.getNode();
+ final String type = search.getType();
+ final long maxTimestamp = search.getMaxTimestamp();
final MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> columnFamily =
columnFamilies.getTargetNodeCfName();
final Serializer<DirectedEdge> serializer = columnFamily.getColumnSerializer();
- final TargetEdgeSearcher<RowKey, DirectedEdge, MarkedEdge> searcher =
- new TargetEdgeSearcher<RowKey, DirectedEdge, MarkedEdge>( scope, maxTimestamp, edgeType.last(),
- shards ) {
+ final OrderedComparator<MarkedEdge> comparator = new OrderedComparator<>( SourceDirectedEdgeDescendingComparator.INSTANCE, search.getOrder());
+
+ final EdgeSearcher<RowKey, DirectedEdge, MarkedEdge> searcher =
+ new EdgeSearcher<RowKey, DirectedEdge, MarkedEdge>( scope, shards, search.getOrder(),comparator, maxTimestamp,
+ search.last().transform( TRANSFORM ) ) {
@Override
protected Serializer<DirectedEdge> getSerializer() {
@@ -541,7 +568,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
@Override
- protected DirectedEdge getStartColumn( final Edge last ) {
+ protected DirectedEdge createColumn( final MarkedEdge last ) {
return new DirectedEdge( last.getSourceNode(), last.getTimestamp() );
}
@@ -561,24 +588,26 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
@Override
public Iterator<MarkedEdge> getEdgesToTargetBySourceType( final EdgeColumnFamilies columnFamilies,
final ApplicationScope scope,
- final SearchByIdType edgeType,
+ final SearchByIdType search,
final Collection<Shard> shards ) {
ValidationUtils.validateApplicationScope( scope );
- GraphValidation.validateSearchByEdgeType( edgeType );
+ GraphValidation.validateSearchByEdgeType( search );
- final Id targetId = edgeType.getNode();
- final String sourceType = edgeType.getIdType();
- final String type = edgeType.getType();
- final long maxTimestamp = edgeType.getMaxTimestamp();
+ final Id targetId = search.getNode();
+ final String sourceType = search.getIdType();
+ final String type = search.getType();
+ final long maxTimestamp = search.getMaxTimestamp();
final MultiTennantColumnFamily<ApplicationScope, RowKeyType, DirectedEdge> columnFamily =
columnFamilies.getTargetNodeSourceTypeCfName();
final Serializer<DirectedEdge> serializer = columnFamily.getColumnSerializer();
+ final OrderedComparator<MarkedEdge> comparator = new OrderedComparator<>( SourceDirectedEdgeDescendingComparator.INSTANCE, search.getOrder());
+
- final TargetEdgeSearcher<RowKeyType, DirectedEdge, MarkedEdge> searcher =
- new TargetEdgeSearcher<RowKeyType, DirectedEdge, MarkedEdge>( scope, maxTimestamp, edgeType.last(),
- shards ) {
+ final EdgeSearcher<RowKeyType, DirectedEdge, MarkedEdge> searcher =
+ new EdgeSearcher<RowKeyType, DirectedEdge, MarkedEdge>( scope, shards, search.getOrder(), comparator, maxTimestamp,
+ search.last().transform( TRANSFORM ) ) {
@Override
protected Serializer<DirectedEdge> getSerializer() {
return serializer;
@@ -592,7 +621,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
@Override
- protected DirectedEdge getStartColumn( final Edge last ) {
+ protected DirectedEdge createColumn( final MarkedEdge last ) {
return new DirectedEdge( last.getTargetNode(), last.getTimestamp() );
}
@@ -608,51 +637,8 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
}
- /**
- * Class for performing searched on rows based on source id
- */
- private static abstract class SourceEdgeSearcher<R, C, T extends Edge> extends EdgeSearcher<R, C, T> {
-
- protected SourceEdgeSearcher( final ApplicationScope scope, final long maxTimestamp, final Optional<Edge> last,
- final Collection<Shard> shards ) {
- super( scope, maxTimestamp, last, shards );
- }
- public int compare( final T o1, final T o2 ) {
- int compare = Long.compare( o1.getTimestamp(), o2.getTimestamp() );
-
- if ( compare == 0 ) {
- compare = o1.getTargetNode().compareTo( o2.getTargetNode() );
- }
-
- return compare;
- }
- }
-
-
- /**
- * Class for performing searched on rows based on target id
- */
- private static abstract class TargetEdgeSearcher<R, C, T extends Edge> extends EdgeSearcher<R, C, T> {
-
- protected TargetEdgeSearcher( final ApplicationScope scope, final long maxTimestamp, final Optional<Edge> last,
- final Collection<Shard> shards ) {
- super( scope, maxTimestamp, last, shards );
- }
-
-
- public int compare( final T o1, final T o2 ) {
- int compare = Long.compare( o1.getTimestamp(), o2.getTimestamp() );
-
- if ( compare == 0 ) {
- compare = o1.getTargetNode().compareTo( o2.getTargetNode() );
- }
-
- return compare;
- }
- }
-
/**
* Simple callback to perform puts and deletes with a common row setup code
@@ -994,4 +980,27 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
return isDeleted;
}
}
+
+
+
+
+
+
+ private static final Function<Edge, MarkedEdge> TRANSFORM = new Function<Edge, MarkedEdge>() {
+ @Nullable
+ @Override
+ public MarkedEdge apply( @Nullable final Edge input ) {
+
+ if ( input == null ) {
+ return null;
+ }
+
+ if ( input instanceof MarkedEdge ) {
+ return ( MarkedEdge ) input;
+ }
+
+ return new SimpleMarkedEdge( input.getSourceNode(), input.getType(), input.getTargetNode(),
+ input.getTimestamp(), false );
+ }
+ };
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java
index 0c7e5b5..d99d98b 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java
@@ -8,6 +8,7 @@ import java.util.NoSuchElementException;
import org.apache.usergrid.persistence.core.astyanax.ColumnNameIterator;
import org.apache.usergrid.persistence.core.astyanax.MultiKeyColumnNameIterator;
+import org.apache.usergrid.persistence.core.astyanax.MultiRowColumnIterator;
import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
@@ -86,6 +87,7 @@ public class ShardsColumnIterator<R, C, T> implements Iterator<T> {
*/
private void startIterator() {
+
/**
* If the edge is present, we need to being seeking from this
*/
@@ -94,7 +96,9 @@ public class ShardsColumnIterator<R, C, T> implements Iterator<T> {
//set the range into the search
- searcher.setRange( rangeBuilder );
+ searcher.buildRange( rangeBuilder );
+
+
/**
* Get our list of slices
@@ -102,27 +106,22 @@ public class ShardsColumnIterator<R, C, T> implements Iterator<T> {
final List<ScopedRowKey<ApplicationScope, R>> rowKeys = searcher.getRowKeys();
- final List<ColumnNameIterator<C, T>> columnNameIterators = new ArrayList<>( rowKeys.size() );
-
- for(ScopedRowKey<ApplicationScope, R> rowKey: rowKeys){
-
+ if(rowKeys.size() == 1){
+ final RowQuery<ScopedRowKey<ApplicationScope, R>, C> query =
+ keyspace.prepareQuery( cf ).setConsistencyLevel( consistencyLevel ).getKey( rowKeys.get( 0 ) )
+ .autoPaginate( true ).withColumnRange( rangeBuilder.build() );
- final RowQuery<ScopedRowKey<ApplicationScope, R>, C> query =
- keyspace.prepareQuery( cf ).setConsistencyLevel( consistencyLevel ).getKey( rowKey )
- .autoPaginate( true ).withColumnRange( rangeBuilder.build() );
-
-
- final ColumnNameIterator<C, T> columnNameIterator = new ColumnNameIterator<>( query, searcher, searcher.hasPage() );
+ currentColumnIterator = new ColumnNameIterator<>( query, searcher, searcher.hasPage() );
+ }
- columnNameIterators.add( columnNameIterator );
+ else{
+ currentColumnIterator = new MultiRowColumnIterator<>( keyspace, cf, consistencyLevel, searcher, searcher, searcher.getComparator(), rowKeys, pageSize);
}
- currentColumnIterator = new MultiKeyColumnNameIterator<>(columnNameIterators, searcher, pageSize);
-
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeColumnFamilies.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeColumnFamilies.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeColumnFamilies.java
index 9050b0a..ddd514b 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeColumnFamilies.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeColumnFamilies.java
@@ -37,6 +37,10 @@ import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeColumn
import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeRowKey;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.RowKey;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.RowKeyType;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.serialize.EdgeRowKeySerializer;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.serialize.EdgeSerializer;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.serialize.RowSerializer;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.serialize.RowTypeSerializer;
import com.netflix.astyanax.serializers.LongSerializer;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/comparators/DescendingTimestampComparator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/comparators/DescendingTimestampComparator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/comparators/DescendingTimestampComparator.java
new file mode 100644
index 0000000..6e52dd5
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/comparators/DescendingTimestampComparator.java
@@ -0,0 +1,43 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.comparators;
+
+
+import java.util.Comparator;
+
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+
+
+/**
+ * Sorts longs from high to low. High is "less than" the low values;
+ *
+ */
+public class DescendingTimestampComparator implements Comparator<MarkedEdge> {
+
+ public static final DescendingTimestampComparator INSTANCE = new DescendingTimestampComparator();
+
+
+ @Override
+ public int compare( final MarkedEdge o1, final MarkedEdge o2 ) {
+ return Long.compare( o1.getTimestamp(), o2.getTimestamp() )*-1;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/comparators/DirectedEdgeDescendingComparator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/comparators/DirectedEdgeDescendingComparator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/comparators/DirectedEdgeDescendingComparator.java
new file mode 100644
index 0000000..6ff65cb
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/comparators/DirectedEdgeDescendingComparator.java
@@ -0,0 +1,69 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.comparators;
+
+
+import java.util.Comparator;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.fasterxml.uuid.UUIDComparator;
+import com.fasterxml.uuid.impl.UUIDUtil;
+
+
+/**
+ * Comparator for comparing edges in descending order. The first comparison is the timestamp,
+ * highest value should be first, so is considered "less". If those are equal, the UUIId is compared.
+ * this will return the UUID to compare. It will first be descending UUID, then ascending name
+ *
+ */
+public abstract class DirectedEdgeDescendingComparator implements Comparator<MarkedEdge> {
+
+ @Override
+ public int compare( final MarkedEdge first, final MarkedEdge second ) {
+
+ int compare = Long.compare( first.getTimestamp(), second.getTimestamp() ) * -1;
+
+ if(compare == 0){
+ final Id firstId = getId( first );
+ final Id secondId = getId( second );
+
+ compare = UUIDComparator.staticCompare( firstId.getUuid(), secondId.getUuid() ) * -1;
+
+ if(compare == 0){
+ compare = firstId.getType().compareTo( secondId.getType() );
+ }
+ }
+
+ return compare;
+ }
+
+
+ /**
+ * Return the Id to be used in the comparison
+ * @param edge
+ * @return
+ */
+ protected abstract Id getId(final MarkedEdge edge);
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/comparators/OrderedComparator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/comparators/OrderedComparator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/comparators/OrderedComparator.java
new file mode 100644
index 0000000..003ed36
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/comparators/OrderedComparator.java
@@ -0,0 +1,52 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.comparators;
+
+
+import java.util.Comparator;
+
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
+
+
+/**
+ * Comparator that will compare in reverse or forward order based on the order type specified.
+ *
+ * Assumes descending uses the default order. If ASCENDING, the result of the comparator will be reversed
+ */
+public class OrderedComparator<T> implements Comparator<T> {
+
+
+ private final int invert;
+ private final Comparator<T> delegate;
+
+
+ public OrderedComparator( final Comparator<T> delegate, final SearchByEdgeType.Order order ) {
+ this.invert = order == SearchByEdgeType.Order.DESCENDING ? 1 : -1;
+ this.delegate = delegate;
+ }
+
+
+ @Override
+ public int compare( final T o1, final T o2 ) {
+ return delegate.compare( o1, o2 ) * invert;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/comparators/SourceDirectedEdgeDescendingComparator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/comparators/SourceDirectedEdgeDescendingComparator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/comparators/SourceDirectedEdgeDescendingComparator.java
new file mode 100644
index 0000000..f067006
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/comparators/SourceDirectedEdgeDescendingComparator.java
@@ -0,0 +1,42 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.comparators;
+
+
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+/**
+ *
+ * Comparator that uses source Id for comparisons. Newer times will be "greater". Newer uuids will be first.
+ *
+ */
+public class SourceDirectedEdgeDescendingComparator extends DirectedEdgeDescendingComparator {
+
+ public static final SourceDirectedEdgeDescendingComparator INSTANCE = new SourceDirectedEdgeDescendingComparator();
+
+ @Override
+ protected Id getId( final MarkedEdge edge ) {
+ return edge.getSourceNode();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/comparators/TargetDirectedEdgeDescendingComparator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/comparators/TargetDirectedEdgeDescendingComparator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/comparators/TargetDirectedEdgeDescendingComparator.java
new file mode 100644
index 0000000..115a874
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/comparators/TargetDirectedEdgeDescendingComparator.java
@@ -0,0 +1,42 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.comparators;
+
+
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+/**
+ *
+ * Comparator that uses source Id for comparisons. Newer time uuids will be first.
+ *
+ */
+public class TargetDirectedEdgeDescendingComparator extends DirectedEdgeDescendingComparator {
+
+ public static final TargetDirectedEdgeDescendingComparator INSTANCE = new TargetDirectedEdgeDescendingComparator();
+
+ @Override
+ protected Id getId( final MarkedEdge edge ) {
+ return edge.getTargetNode();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/EdgeRowKeySerializer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/EdgeRowKeySerializer.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/EdgeRowKeySerializer.java
new file mode 100644
index 0000000..f1ae90b
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/EdgeRowKeySerializer.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.serialize;
+
+
+import org.apache.usergrid.persistence.core.astyanax.CompositeFieldSerializer;
+import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeRowKey;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.netflix.astyanax.model.CompositeBuilder;
+import com.netflix.astyanax.model.CompositeParser;
+
+
+/**
+ * Class to perform serialization for row keys from edges
+ */
+
+public class EdgeRowKeySerializer implements CompositeFieldSerializer<EdgeRowKey> {
+
+ private static final IdRowCompositeSerializer ID_SER = IdRowCompositeSerializer.get();
+
+
+ @Override
+ public void toComposite( final CompositeBuilder builder, final EdgeRowKey key ) {
+
+ //add the row id to the composite
+ ID_SER.toComposite( builder, key.sourceId );
+ builder.addString( key.edgeType );
+ ID_SER.toComposite( builder, key.targetId );
+ builder.addLong( key.shardId );
+ }
+
+
+ @Override
+ public EdgeRowKey fromComposite( final CompositeParser composite ) {
+
+ final Id sourceId = ID_SER.fromComposite( composite );
+ final String edgeType = composite.readString();
+ final Id targetId = ID_SER.fromComposite( composite );
+ final long shard = composite.readLong();
+
+ return new EdgeRowKey( sourceId, edgeType, targetId, shard );
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/EdgeSerializer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/EdgeSerializer.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/EdgeSerializer.java
new file mode 100644
index 0000000..590cf35
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/EdgeSerializer.java
@@ -0,0 +1,77 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.serialize;
+
+
+import java.nio.ByteBuffer;
+
+import org.apache.usergrid.persistence.core.astyanax.IdColDynamicCompositeSerializer;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdge;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Preconditions;
+import com.netflix.astyanax.model.DynamicComposite;
+import com.netflix.astyanax.serializers.AbstractSerializer;
+import com.netflix.astyanax.serializers.LongSerializer;
+
+
+/**
+ * Serializes to a source->target edge Note that we cannot set the edge type on de-serialization. Only the target
+ * Id and version.
+ */
+public class EdgeSerializer extends AbstractSerializer<DirectedEdge> {
+
+ private static final IdColDynamicCompositeSerializer ID_COL_SERIALIZER = IdColDynamicCompositeSerializer.get();
+ private static final LongSerializer LONG_SERIALIZER = LongSerializer.get();
+
+
+ @Override
+ public ByteBuffer toByteBuffer( final DirectedEdge edge ) {
+
+ DynamicComposite composite = new DynamicComposite();
+
+ composite.addComponent( edge.timestamp, LONG_SERIALIZER );
+
+ ID_COL_SERIALIZER.toComposite( composite, edge.id );
+
+ return composite.serialize();
+ }
+
+
+ @Override
+ public DirectedEdge fromByteBuffer( final ByteBuffer byteBuffer ) {
+ DynamicComposite composite = DynamicComposite.fromByteBuffer( byteBuffer );
+
+ Preconditions.checkArgument( composite.size() == 3, "Composite should have 3 elements" );
+
+
+ //return the version
+ final long timestamp = composite.get( 0, LONG_SERIALIZER );
+
+
+ //parse our id
+ final Id id = ID_COL_SERIALIZER.fromComposite( composite, 1 );
+
+
+ return new DirectedEdge( id, timestamp );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/EdgeShardRowKeySerializer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/EdgeShardRowKeySerializer.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/EdgeShardRowKeySerializer.java
new file mode 100644
index 0000000..6b1c4e9
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/EdgeShardRowKeySerializer.java
@@ -0,0 +1,103 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.serialize;
+
+
+import org.apache.usergrid.persistence.core.astyanax.CompositeFieldSerializer;
+import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeType;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.netflix.astyanax.model.CompositeBuilder;
+import com.netflix.astyanax.model.CompositeParser;
+
+
+public class EdgeShardRowKeySerializer implements CompositeFieldSerializer<DirectedEdgeMeta> {
+
+ private static final IdRowCompositeSerializer ID_SER = IdRowCompositeSerializer.get();
+
+ public static final EdgeShardRowKeySerializer INSTANCE = new EdgeShardRowKeySerializer();
+
+
+ @Override
+ public void toComposite( final CompositeBuilder builder, final DirectedEdgeMeta meta ) {
+
+
+ final DirectedEdgeMeta.NodeMeta[] nodeMeta = meta.getNodes();
+
+ //add the stored value
+ builder.addInteger( meta.getType().getStorageValue() );
+
+ final int length = nodeMeta.length;
+
+ builder.addInteger( length );
+
+
+ for ( DirectedEdgeMeta.NodeMeta node : nodeMeta ) {
+ ID_SER.toComposite( builder, node.getId() );
+ builder.addInteger( node.getNodeType().getStorageValue() );
+ }
+
+ final String[] edgeTypes = meta.getTypes();
+
+ builder.addInteger( edgeTypes.length );
+
+ for ( String type : edgeTypes ) {
+ builder.addString( type );
+ }
+ }
+
+
+ @Override
+ public DirectedEdgeMeta fromComposite( final CompositeParser composite ) {
+
+
+ final int storageType = composite.readInteger();
+
+ final DirectedEdgeMeta.MetaType metaType = DirectedEdgeMeta.MetaType.fromStorage( storageType );
+
+ final int idLength = composite.readInteger();
+
+ final DirectedEdgeMeta.NodeMeta[] nodePairs = new DirectedEdgeMeta.NodeMeta[idLength];
+
+
+ for ( int i = 0; i < idLength; i++ ) {
+ final Id sourceId = ID_SER.fromComposite( composite );
+
+ final NodeType type = NodeType.get( composite.readInteger() );
+
+ nodePairs[i] = new DirectedEdgeMeta.NodeMeta( sourceId, type );
+ }
+
+
+ final int length = composite.readInteger();
+
+ String[] types = new String[length];
+
+ for ( int i = 0; i < length; i++ ) {
+ types[i] = composite.readString();
+ }
+
+ return DirectedEdgeMeta.fromStorage( metaType, nodePairs, types );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/RowSerializer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/RowSerializer.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/RowSerializer.java
new file mode 100644
index 0000000..8376ef1
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/RowSerializer.java
@@ -0,0 +1,63 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.serialize;
+
+
+import org.apache.usergrid.persistence.core.astyanax.CompositeFieldSerializer;
+import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.RowKey;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.netflix.astyanax.model.CompositeBuilder;
+import com.netflix.astyanax.model.CompositeParser;
+
+
+/**
+ * Class to perform serialization for row keys from edges
+ */
+public class RowSerializer implements CompositeFieldSerializer<RowKey> {
+
+ private static final IdRowCompositeSerializer ID_SER = IdRowCompositeSerializer.get();
+
+
+ @Override
+ public void toComposite( final CompositeBuilder builder, final RowKey key ) {
+
+ //add the row id to the composite
+ ID_SER.toComposite( builder, key.nodeId );
+
+ builder.addString( key.edgeType );
+ builder.addLong( key.shardId );
+ }
+
+
+ @Override
+ public RowKey fromComposite( final CompositeParser composite ) {
+
+ final Id id = ID_SER.fromComposite( composite );
+ final String edgeType = composite.readString();
+ final long shard = composite.readLong();
+
+
+ return new RowKey( id, edgeType, shard );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/RowTypeSerializer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/RowTypeSerializer.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/RowTypeSerializer.java
new file mode 100644
index 0000000..a67c469
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/RowTypeSerializer.java
@@ -0,0 +1,62 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.serialize;
+
+
+import org.apache.usergrid.persistence.core.astyanax.CompositeFieldSerializer;
+import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.RowKeyType;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.netflix.astyanax.model.CompositeBuilder;
+import com.netflix.astyanax.model.CompositeParser;
+
+
+public class RowTypeSerializer implements CompositeFieldSerializer<RowKeyType> {
+
+ private static final IdRowCompositeSerializer ID_SER = IdRowCompositeSerializer.get();
+
+
+ @Override
+ public void toComposite( final CompositeBuilder builder, final RowKeyType keyType ) {
+
+ //add the row id to the composite
+ ID_SER.toComposite( builder, keyType.nodeId );
+
+ builder.addString( keyType.edgeType );
+ builder.addString( keyType.idType );
+
+ builder.addLong( keyType.shardId );
+ }
+
+
+ @Override
+ public RowKeyType fromComposite( final CompositeParser composite ) {
+
+ final Id id = ID_SER.fromComposite( composite );
+ final String edgeType = composite.readString();
+ final String idType = composite.readString();
+ final long shard = composite.readLong();
+
+ return new RowKeyType( id, edgeType, idType, shard);
+ }
+}