You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2014/08/12 19:00:44 UTC
[3/9] git commit: Updated tests and shard logic to be more efficient.
O(1) adds vs O( log n )
Updated tests and shard logic to be more efficient. O(1) adds vs O( log n )
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/e9fa368e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/e9fa368e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/e9fa368e
Branch: refs/heads/USERGRID-188
Commit: e9fa368ef22140254b350e856d71a5fe1c42ef92
Parents: aae3902
Author: Todd Nine <to...@apache.org>
Authored: Tue Aug 5 13:01:04 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Tue Aug 5 13:01:04 2014 -0600
----------------------------------------------------------------------
.../persistence/graph/guice/GraphModule.java | 4 +-
.../graph/serialization/impl/shard/Shard.java | 6 +-
.../impl/shard/ShardEntryGroup.java | 92 +-
.../shard/impl/ShardEntryGroupIterator.java | 30 +-
.../impl/ShardedEdgeSerializationImpl.java | 138 +--
.../graph/ComittedGraphManagerIT.java | 138 ---
.../graph/CommittedGraphManagerIT.java | 138 +++
.../impl/shard/EdgeShardSerializationTest.java | 48 +-
.../impl/shard/NodeShardAllocationTest.java | 1103 +++++++++---------
.../impl/shard/ShardEntryGroupTest.java | 198 +++-
.../shard/impl/ShardEntryGroupIteratorTest.java | 260 +++++
11 files changed, 1298 insertions(+), 857 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9fa368e/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
index 3089889..ca7c270 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
@@ -151,13 +151,15 @@ public class GraphModule extends AbstractModule {
public EdgeSerialization storageSerialization( final NodeShardCache cache, final Keyspace keyspace,
final CassandraConfig cassandraConfig, final GraphFig graphFig,
final NodeShardApproximation shardApproximation,
+ final TimeService timeService,
@StorageEdgeSerialization
final EdgeColumnFamilies edgeColumnFamilies ) {
final EdgeShardStrategy sizeBasedStrategy = new SizebasedEdgeShardStrategy( cache, shardApproximation );
- final ShardedEdgeSerialization serialization = new ShardedEdgeSerializationImpl(keyspace, cassandraConfig, graphFig, sizeBasedStrategy);
+ final ShardedEdgeSerialization serialization = new ShardedEdgeSerializationImpl(keyspace, cassandraConfig, graphFig, sizeBasedStrategy,
+ timeService );
final EdgeSerializationImpl edgeSerialization =
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9fa368e/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java
index 4b58224..38fe51c 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java
@@ -103,12 +103,15 @@ public class Shard implements Comparable<Shard> {
if ( this == o ) {
return true;
}
- if ( !( o instanceof Shard ) ) {
+ if ( o == null || getClass() != o.getClass() ) {
return false;
}
final Shard shard = ( Shard ) o;
+ if ( compacted != shard.compacted ) {
+ return false;
+ }
if ( createdTime != shard.createdTime ) {
return false;
}
@@ -124,6 +127,7 @@ public class Shard implements Comparable<Shard> {
public int hashCode() {
int result = ( int ) ( shardIndex ^ ( shardIndex >>> 32 ) );
result = 31 * result + ( int ) ( createdTime ^ ( createdTime >>> 32 ) );
+ result = 31 * result + ( compacted ? 1 : 0 );
return result;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9fa368e/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
index e39f86b..bea428b 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
@@ -19,11 +19,15 @@
package org.apache.usergrid.persistence.graph.serialization.impl.shard;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
+import java.util.List;
import java.util.TreeSet;
+import com.google.common.base.Preconditions;
+
/**
* There are cases where we need to read or write to more than 1 shard. This object encapsulates a set of shards that
@@ -35,7 +39,7 @@ import java.util.TreeSet;
public class ShardEntryGroup {
- private TreeSet<Shard> shards;
+ private List<Shard> shards;
private final long delta;
@@ -48,8 +52,9 @@ public class ShardEntryGroup {
* The max delta we accept in milliseconds for create time to be considered a member of this group
*/
public ShardEntryGroup( final long delta ) {
+ Preconditions.checkArgument(delta > 0, "delta must be greater than 0");
this.delta = delta;
- this.shards = new TreeSet<>();
+ this.shards = new ArrayList<>();
this.maxCreatedTime = 0;
}
@@ -66,41 +71,21 @@ public class ShardEntryGroup {
*/
public boolean addShard( final Shard shard ) {
- //shards can be ar
-
- //compare the time and see if it falls withing any of the elements based on their timestamp
- // final long shardCreateTime = shard.getCreatedTime();
-
- // final Long lessThanKey = shards.floorKey( shardCreateTime );
- //
- // final Long greaterThanKey = shards.ceilingKey( shardCreateTime );
- //
- // //first into the set
- // if ( lessThanKey == null && greaterThanKey == null ) {
- // addShardInternal( shard );
- // return true;
- // }
- //
- // if ( lessThanKey != null && shardCreateTime - lessThanKey < delta ) {
- // addShardInternal( shard );
- // return true;
- // }
- //
- //
- // if ( greaterThanKey != null && greaterThanKey - shardCreateTime < delta ) {
- // addShardInternal( shard );
- //
- // return true;
- // }
-
- if ( shards.size() == 0 ) {
+ Preconditions.checkNotNull( "shard cannot be null", shard );
+
+ final int size = shards.size();
+
+ if ( size == 0 ) {
addShardInternal( shard );
return true;
}
+ final Shard minShard = shards.get( size -1 );
+
+ Preconditions.checkArgument(minShard.compareTo(shard) > 0, "shard must be less than the current max");
//shard is not compacted, or it's predecessor isn't, we should include it in this group
- if ( !shard.isCompacted() || !shards.first().isCompacted() ) {
+ if ( !minShard.isCompacted() ) {
addShardInternal( shard );
return true;
}
@@ -140,7 +125,7 @@ public class ShardEntryGroup {
* The shards in this set can be combined, we should only write to the compaction target to avoid
* adding data to other shards
*/
- if ( shouldCompact( currentTime ) ) {
+ if ( !isTooSmallToCompact() && shouldCompact( currentTime ) ) {
return Collections.singleton( getCompactionTarget() );
}
@@ -160,36 +145,57 @@ public class ShardEntryGroup {
return compactionTarget;
}
+
//we have < 2 shards, we can't compact
- if ( shards.size() < 2 ) {
+ if (isTooSmallToCompact()) {
return null;
}
- Iterator<Shard> descendingIterator = shards.iterator();
- //if we don't have a next, or our "lowest" shard can't be compacted we have no nearest neighbor
- //to use as a bookend. We can't compact.
- if(!descendingIterator.hasNext() || !descendingIterator.next().isCompacted()){
+ final int lastIndex = shards.size() -1;
+
+ final Shard last = shards.get( lastIndex );
+
+ //Our oldest isn't compacted. As a result we have no "bookend" to delimit this entry group. Therefore we can't compact
+ if(!last.isCompacted()){
return null;
}
+ //Start seeking from the end of our group. The first shard we encounter that is not compacted is our compaction target
+ //NOTE: This does not mean we can compact, rather it's just an indication that we have a target set.
+ for(int i = lastIndex - 1; i > -1; i --){
+ final Shard compactionCandidate = shards.get( i );
- //our next should be able to be compacted.
+ if(!compactionCandidate.isCompacted()){
+ compactionTarget = compactionCandidate;
+ break;
+ }
- if(!descendingIterator.hasNext()){
- return null;
}
- //We use this value a lot, cache it
- compactionTarget = descendingIterator.next();
-
return compactionTarget;
}
/**
+ * Return the number of entries in this shard group
+ * @return
+ */
+ public int entrySize(){
+ return shards.size();
+ }
+
+ /**
+ * Return true if there are not enough elements in this entry group to consider compaction
+ * @return
+ */
+ private boolean isTooSmallToCompact(){
+ return shards.size() < 2;
+ }
+
+ /**
* Returns true if the newest created shard is path the currentTime - delta
*
* @param currentTime The current system time in milliseconds
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9fa368e/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java
index 8e69be4..89104e8 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java
@@ -24,11 +24,19 @@ public class ShardEntryGroupIterator implements Iterator<ShardEntryGroup> {
/**
* Create a shard iterator
- * @param sourceIterator The iterator of all shards. Order is expected to be by the shard index from Long.MAX to Long.MIN
+ * @param shardIterator The iterator of all shards. Order is expected to be by the shard index from Long.MAX to Long.MIN
* @param minDelta The minimum delta we allow to consider shards the same group
*/
- public ShardEntryGroupIterator( final Iterator<Shard> sourceIterator, final long minDelta ) {
- this.sourceIterator = new PushbackIterator( sourceIterator );
+ public ShardEntryGroupIterator( final Iterator<Shard> shardIterator, final long minDelta ) {
+ this.sourceIterator = new PushbackIterator( shardIterator );
+
+ /**
+ * If we don't have any shards, we need to push our "MIN" shard into the list
+ */
+ if(!sourceIterator.hasNext()){
+ sourceIterator.pushback( new Shard(0, 0, true) );
+ }
+
this.minDelta = minDelta;
}
@@ -39,7 +47,7 @@ public class ShardEntryGroupIterator implements Iterator<ShardEntryGroup> {
advance();
}
- return next == null;
+ return next != null;
}
@@ -69,29 +77,29 @@ public class ShardEntryGroupIterator implements Iterator<ShardEntryGroup> {
*/
private void advance() {
-
- final ShardEntryGroup shardEntryGroup = new ShardEntryGroup( minDelta );
-
-
/**
* We loop through until we've exhausted our source, or we have 2 elements, which means
* they're > min time allocation from one another
*/
while ( sourceIterator.hasNext() ) {
+ if(next == null){
+ next = new ShardEntryGroup( minDelta );
+ }
final Shard shard = sourceIterator.next();
//we can't add this one to the entries, it doesn't fit within the delta, allocate a new one and break
- if ( shardEntryGroup.addShard( shard ) ) {
+ if ( next.addShard( shard ) ) {
continue;
}
- //we can't add this shard to the current group. Add the group and return.
- next = shardEntryGroup;
sourceIterator.pushback( shard );
+ break;
}
+
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9fa368e/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
index d2e2ddb..558d3fc 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
@@ -29,6 +29,7 @@ import javax.inject.Inject;
import org.apache.usergrid.persistence.core.astyanax.CassandraConfig;
import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
+import org.apache.usergrid.persistence.core.consistency.TimeService;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.util.ValidationUtils;
import org.apache.usergrid.persistence.graph.Edge;
@@ -67,22 +68,27 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
protected final CassandraConfig cassandraConfig;
protected final GraphFig graphFig;
protected final EdgeShardStrategy writeEdgeShardStrategy;
+ protected final TimeService timeService;
@Inject
public ShardedEdgeSerializationImpl( final Keyspace keyspace, final CassandraConfig cassandraConfig,
- final GraphFig graphFig, final EdgeShardStrategy writeEdgeShardStrategy ) {
+ final GraphFig graphFig, final EdgeShardStrategy writeEdgeShardStrategy,
+ final TimeService timeService ) {
+
checkNotNull( "keyspace required", keyspace );
checkNotNull( "cassandraConfig required", cassandraConfig );
checkNotNull( "consistencyFig required", graphFig );
checkNotNull( "writeEdgeShardStrategy required", writeEdgeShardStrategy );
+ checkNotNull( "timeService required", timeService );
this.keyspace = keyspace;
this.cassandraConfig = cassandraConfig;
this.graphFig = graphFig;
this.writeEdgeShardStrategy = writeEdgeShardStrategy;
+ this.timeService = timeService;
}
@@ -198,6 +204,8 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
*/
+ final long time = timeService.getCurrentTime();
+
final DirectedEdge sourceEdge = new DirectedEdge( targetNodeId, timestamp );
final ShardEntryGroup sourceRowKeyShard =
@@ -207,70 +215,70 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
columnFamilies.getSourceNodeCfName();
-// for ( Shard shard : sourceRowKeyShard.getEntries() ) {
-//
-// final long shardId = shard.getShardIndex();
-// final RowKey sourceRowKey = new RowKey( sourceNodeId, type, shardId );
-// op.writeEdge( sourceCf, sourceRowKey, sourceEdge );
-// op.countEdge( sourceNodeId, NodeType.SOURCE, shardId, type );
-// }
-//
-//
-// final ShardEntryGroup sourceWithTypeRowKeyShard = writeEdgeShardStrategy
-// .getWriteShards( scope, sourceNodeId, NodeType.SOURCE, timestamp, type, targetNodeType );
-//
-// final MultiTennantColumnFamily<ApplicationScope, RowKeyType, DirectedEdge> targetCf =
-// columnFamilies.getSourceNodeTargetTypeCfName();
-//
-// for ( Shard shard : sourceWithTypeRowKeyShard.getEntries() ) {
-//
-// final long shardId = shard.getShardIndex();
-// final RowKeyType sourceRowKeyType = new RowKeyType( sourceNodeId, type, targetNodeId, shardId );
-//
-// op.writeEdge( targetCf, sourceRowKeyType, sourceEdge );
-// op.countEdge( sourceNodeId, NodeType.SOURCE, shardId, type, targetNodeType );
-// }
-//
-//
-// /**
-// * write edges from target<-source
-// */
-//
-// final DirectedEdge targetEdge = new DirectedEdge( sourceNodeId, timestamp );
-//
-//
-// final ShardEntryGroup targetRowKeyShard =
-// writeEdgeShardStrategy.getWriteShards( scope, targetNodeId, NodeType.TARGET, timestamp, type );
-//
-// final MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> sourceByTargetCf =
-// columnFamilies.getTargetNodeCfName();
-//
-// for ( Shard shard : targetRowKeyShard.getEntries() ) {
-// final long shardId = shard.getShardIndex();
-// final RowKey targetRowKey = new RowKey( targetNodeId, type, shardId );
-//
-// op.writeEdge( sourceByTargetCf, targetRowKey, targetEdge );
-// op.countEdge( targetNodeId, NodeType.TARGET, shardId, type );
-// }
-//
-//
-// final ShardEntryGroup targetWithTypeRowKeyShard = writeEdgeShardStrategy
-// .getWriteShards( scope, targetNodeId, NodeType.TARGET, timestamp, type, sourceNodeType );
-//
-// final MultiTennantColumnFamily<ApplicationScope, RowKeyType, DirectedEdge> targetBySourceCf =
-// columnFamilies.getTargetNodeSourceTypeCfName();
-//
-//
-// for ( Shard shard : targetWithTypeRowKeyShard.getEntries() ) {
-//
-// final long shardId = shard.getShardIndex();
-//
-// final RowKeyType targetRowKeyType = new RowKeyType( targetNodeId, type, sourceNodeId, shardId );
-//
-//
-// op.writeEdge( targetBySourceCf, targetRowKeyType, targetEdge );
-// op.countEdge( targetNodeId, NodeType.TARGET, shardId, type, sourceNodeType );
-// }
+ for ( Shard shard : sourceRowKeyShard.getWriteShards(time) ) {
+
+ final long shardId = shard.getShardIndex();
+ final RowKey sourceRowKey = new RowKey( sourceNodeId, type, shardId );
+ op.writeEdge( sourceCf, sourceRowKey, sourceEdge );
+ op.countEdge( sourceNodeId, NodeType.SOURCE, shardId, type );
+ }
+
+
+ final ShardEntryGroup sourceWithTypeRowKeyShard = writeEdgeShardStrategy
+ .getWriteShards( scope, sourceNodeId, NodeType.SOURCE, timestamp, type, targetNodeType );
+
+ final MultiTennantColumnFamily<ApplicationScope, RowKeyType, DirectedEdge> targetCf =
+ columnFamilies.getSourceNodeTargetTypeCfName();
+
+ for ( Shard shard : sourceWithTypeRowKeyShard.getWriteShards(time) ) {
+
+ final long shardId = shard.getShardIndex();
+ final RowKeyType sourceRowKeyType = new RowKeyType( sourceNodeId, type, targetNodeId, shardId );
+
+ op.writeEdge( targetCf, sourceRowKeyType, sourceEdge );
+ op.countEdge( sourceNodeId, NodeType.SOURCE, shardId, type, targetNodeType );
+ }
+
+
+ /**
+ * write edges from target<-source
+ */
+
+ final DirectedEdge targetEdge = new DirectedEdge( sourceNodeId, timestamp );
+
+
+ final ShardEntryGroup targetRowKeyShard =
+ writeEdgeShardStrategy.getWriteShards( scope, targetNodeId, NodeType.TARGET, timestamp, type );
+
+ final MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> sourceByTargetCf =
+ columnFamilies.getTargetNodeCfName();
+
+ for ( Shard shard : targetRowKeyShard.getWriteShards(time) ) {
+ final long shardId = shard.getShardIndex();
+ final RowKey targetRowKey = new RowKey( targetNodeId, type, shardId );
+
+ op.writeEdge( sourceByTargetCf, targetRowKey, targetEdge );
+ op.countEdge( targetNodeId, NodeType.TARGET, shardId, type );
+ }
+
+
+ final ShardEntryGroup targetWithTypeRowKeyShard = writeEdgeShardStrategy
+ .getWriteShards( scope, targetNodeId, NodeType.TARGET, timestamp, type, sourceNodeType );
+
+ final MultiTennantColumnFamily<ApplicationScope, RowKeyType, DirectedEdge> targetBySourceCf =
+ columnFamilies.getTargetNodeSourceTypeCfName();
+
+
+ for ( Shard shard : targetWithTypeRowKeyShard.getWriteShards(time) ) {
+
+ final long shardId = shard.getShardIndex();
+
+ final RowKeyType targetRowKeyType = new RowKeyType( targetNodeId, type, sourceNodeId, shardId );
+
+
+ op.writeEdge( targetBySourceCf, targetRowKeyType, targetEdge );
+ op.countEdge( targetNodeId, NodeType.TARGET, shardId, type, sourceNodeType );
+ }
/**
* Always a 0l shard, we're hard limiting 2b timestamps for the same edge
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9fa368e/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/ComittedGraphManagerIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/ComittedGraphManagerIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/ComittedGraphManagerIT.java
deleted file mode 100644
index 596480d..0000000
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/ComittedGraphManagerIT.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- *
- * * Licensed to the Apache Software Foundation (ASF) under one
- * * or more contributor license agreements. See the NOTICE file
- * * distributed with this work for additional information
- * * regarding copyright ownership. The ASF licenses this file
- * * to you under the Apache License, Version 2.0 (the
- * * "License"); you may not use this file except in compliance
- * * with the License. You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing,
- * * software distributed under the License is distributed on an
- * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * * KIND, either express or implied. See the License for the
- * * specific language governing permissions and limitations
- * * under the License.
- *
- */
-package org.apache.usergrid.persistence.graph;
-
-
-import java.util.concurrent.TimeUnit;
-
-import org.jukito.JukitoRunner;
-import org.jukito.UseModules;
-import org.junit.runner.RunWith;
-
-import org.apache.usergrid.persistence.core.cassandra.ITRunner;
-import org.apache.usergrid.persistence.graph.guice.TestGraphModule;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import rx.Observable;
-
-
-/**
- * Integration test that performs all calls immediately after writes without blocking. Tests that our
- * view is immediately consistent to our users, even if we have yet to perform background processing
- */
-@RunWith(ITRunner.class)
-@UseModules({ TestGraphModule.class })
-public class ComittedGraphManagerIT extends GraphManagerIT {
-
-
- @Override
- protected GraphManager getHelper(GraphManager gm) {
- return new ComittedGraphTestHelper( gm );
- }
-
-
- /**
- * Doesn't wait for the async process to happen before returning. Simply executes and immediately returns.
- */
- public static class ComittedGraphTestHelper implements GraphManager {
-
- private final GraphManager graphManager;
-
-
- public ComittedGraphTestHelper( final GraphManager graphManager ) {
- this.graphManager = graphManager;
- }
-
-
- @Override
- public Observable<Edge> writeEdge( final Edge edge ) {
- return graphManager.writeEdge( edge );
- }
-
-
- @Override
- public Observable<Edge> deleteEdge( final Edge edge ) {
- return graphManager.deleteEdge( edge );
- }
-
-
- @Override
- public Observable<Id> deleteNode( final Id node, final long timestamp) {
- return graphManager.deleteNode( node, timestamp );
- }
-
-
- @Override
- public Observable<Edge> loadEdgeVersions( final SearchByEdge edge ) {
- return graphManager.loadEdgeVersions( edge );
- }
-
-
- @Override
- public Observable<Edge> loadEdgesFromSource( final SearchByEdgeType search ) {
- return graphManager.loadEdgesFromSource( search );
- }
-
-
- @Override
- public Observable<Edge> loadEdgesToTarget( final SearchByEdgeType search ) {
- return graphManager.loadEdgesToTarget( search );
- }
-
-
- @Override
- public Observable<Edge> loadEdgesFromSourceByType( final SearchByIdType search ) {
- return graphManager.loadEdgesFromSourceByType(search);
- }
-
-
- @Override
- public Observable<Edge> loadEdgesToTargetByType( final SearchByIdType search ) {
- return graphManager.loadEdgesToTargetByType( search );
- }
-
-
- @Override
- public Observable<String> getEdgeTypesFromSource( final SearchEdgeType search ) {
- return graphManager.getEdgeTypesFromSource( search );
- }
-
-
- @Override
- public Observable<String> getIdTypesFromSource( final SearchIdType search ) {
- return graphManager.getIdTypesFromSource( search );
- }
-
-
- @Override
- public Observable<String> getEdgeTypesToTarget( final SearchEdgeType search ) {
- return graphManager.getEdgeTypesToTarget( search );
- }
-
-
- @Override
- public Observable<String> getIdTypesToTarget( final SearchIdType search ) {
- return graphManager.getIdTypesToTarget( search );
- }
-
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9fa368e/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/CommittedGraphManagerIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/CommittedGraphManagerIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/CommittedGraphManagerIT.java
new file mode 100644
index 0000000..3c04ab4
--- /dev/null
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/CommittedGraphManagerIT.java
@@ -0,0 +1,138 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ *
+ */
+package org.apache.usergrid.persistence.graph;
+
+
+import java.util.concurrent.TimeUnit;
+
+import org.jukito.JukitoRunner;
+import org.jukito.UseModules;
+import org.junit.runner.RunWith;
+
+import org.apache.usergrid.persistence.core.cassandra.ITRunner;
+import org.apache.usergrid.persistence.graph.guice.TestGraphModule;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import rx.Observable;
+
+
+/**
+ * Integration test that performs all calls immediately after writes without blocking. Tests that our
+ * view is immediately consistent to our users, even if we have yet to perform background processing
+ */
+@RunWith(ITRunner.class)
+@UseModules({ TestGraphModule.class })
+public class CommittedGraphManagerIT extends GraphManagerIT {
+
+
+ @Override
+ protected GraphManager getHelper(GraphManager gm) {
+ return new ComittedGraphTestHelper( gm );
+ }
+
+
+ /**
+ * Doesn't wait for the async process to happen before returning. Simply executes and immediately returns.
+ */
+ public static class ComittedGraphTestHelper implements GraphManager {
+
+ private final GraphManager graphManager;
+
+
+ public ComittedGraphTestHelper( final GraphManager graphManager ) {
+ this.graphManager = graphManager;
+ }
+
+
+ @Override
+ public Observable<Edge> writeEdge( final Edge edge ) {
+ return graphManager.writeEdge( edge );
+ }
+
+
+ @Override
+ public Observable<Edge> deleteEdge( final Edge edge ) {
+ return graphManager.deleteEdge( edge );
+ }
+
+
+ @Override
+ public Observable<Id> deleteNode( final Id node, final long timestamp) {
+ return graphManager.deleteNode( node, timestamp );
+ }
+
+
+ @Override
+ public Observable<Edge> loadEdgeVersions( final SearchByEdge edge ) {
+ return graphManager.loadEdgeVersions( edge );
+ }
+
+
+ @Override
+ public Observable<Edge> loadEdgesFromSource( final SearchByEdgeType search ) {
+ return graphManager.loadEdgesFromSource( search );
+ }
+
+
+ @Override
+ public Observable<Edge> loadEdgesToTarget( final SearchByEdgeType search ) {
+ return graphManager.loadEdgesToTarget( search );
+ }
+
+
+ @Override
+ public Observable<Edge> loadEdgesFromSourceByType( final SearchByIdType search ) {
+ return graphManager.loadEdgesFromSourceByType(search);
+ }
+
+
+ @Override
+ public Observable<Edge> loadEdgesToTargetByType( final SearchByIdType search ) {
+ return graphManager.loadEdgesToTargetByType( search );
+ }
+
+
+ @Override
+ public Observable<String> getEdgeTypesFromSource( final SearchEdgeType search ) {
+ return graphManager.getEdgeTypesFromSource( search );
+ }
+
+
+ @Override
+ public Observable<String> getIdTypesFromSource( final SearchIdType search ) {
+ return graphManager.getIdTypesFromSource( search );
+ }
+
+
+ @Override
+ public Observable<String> getEdgeTypesToTarget( final SearchEdgeType search ) {
+ return graphManager.getEdgeTypesToTarget( search );
+ }
+
+
+ @Override
+ public Observable<String> getIdTypesToTarget( final SearchIdType search ) {
+ return graphManager.getIdTypesToTarget( search );
+ }
+
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9fa368e/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerializationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerializationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerializationTest.java
index 00e0164..3d98486 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerializationTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerializationTest.java
@@ -109,33 +109,13 @@ public class EdgeShardSerializationTest {
Iterator<Shard> results =
edgeShardSerialization.getShardMetaData( scope, now, NodeType.SOURCE, Optional.<Shard>absent(), types );
- Shard next = results.next();
-
- assertEquals( shard3, next.getShardIndex() );
-
- assertEquals( shard3.getCreatedTime(), next.getCreatedTime() );
-
- assertEquals( shard3.isCompacted(), next.isCompacted() );
-
-
- next = results.next();
-
-
- assertEquals( shard2, next.getShardIndex() );
-
- assertEquals( shard2.getCreatedTime(), next.getCreatedTime() );
-
- assertEquals( shard2.isCompacted(), next.isCompacted() );
-
-
- next = results.next();
+ assertEquals( shard3, results.next() );
- assertEquals( shard1, next.getShardIndex() );
+ assertEquals( shard2, results.next() );
- assertEquals( shard1.getCreatedTime(), next.getCreatedTime() );
+ assertEquals( shard1, results.next() );
- assertEquals( shard1.isCompacted(), next.isCompacted() );
assertFalse( results.hasNext() );
@@ -149,24 +129,10 @@ public class EdgeShardSerializationTest {
//test paging and size
results = edgeShardSerialization.getShardMetaData( scope, now, NodeType.SOURCE, Optional.of( shard2 ), types );
- next = results.next();
-
-
- assertEquals( shard2, next.getShardIndex() );
-
- assertEquals( shard2.getCreatedTime(), next.getCreatedTime() );
-
- assertEquals( shard2.isCompacted(), next.isCompacted() );
-
-
- next = results.next();
-
-
- assertEquals( shard1, next.getShardIndex() );
+ assertEquals( shard2, results.next() );
- assertEquals( shard1.getCreatedTime(), next.getCreatedTime() );
- assertEquals( shard1.isCompacted(), next.isCompacted() );
+ assertEquals( shard1, results.next() );
assertFalse( results.hasNext() );
@@ -225,9 +191,9 @@ public class EdgeShardSerializationTest {
results =
edgeShardSerialization.getShardMetaData( scope, now, NodeType.SOURCE, Optional.<Shard>absent(), types );
- assertEquals( shard3, results.next().getShardIndex() );
+ assertEquals( shard3, results.next() );
- assertEquals( shard2, results.next().getShardIndex() );
+ assertEquals( shard2, results.next() );
assertFalse( results.hasNext() );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9fa368e/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
index 0ac7f78..a31a8ad 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
@@ -86,556 +86,555 @@ public class NodeShardAllocationTest {
}
-// @Test
-// public void minTime() {
-// final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
-//
-// final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
-//
-// final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
-//
-// final NodeShardApproximation nodeShardCounterSerialization = mock( NodeShardApproximation.class );
-//
-//
-// final TimeService timeService = mock( TimeService.class );
-//
-// final Keyspace keyspace = mock( Keyspace.class );
-//
-//
-// NodeShardAllocation approximation =
-// new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
-// nodeShardCounterSerialization, timeService, graphFig, keyspace );
-//
-//
-// final long timeservicetime = System.currentTimeMillis();
-//
-// when( timeService.getCurrentTime() ).thenReturn( timeservicetime );
-//
-// final long expected = timeservicetime - 2 * graphFig.getShardCacheTimeout();
-//
-// final long returned = approximation.getMinTime();
-//
-// assertEquals( "Correct time was returned", expected, returned );
-// }
-//
-//
-// @Test
-// public void noShards() {
-// final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
-//
-// final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
-//
-// final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
-//
-//
-// final NodeShardApproximation nodeShardCounterSerialization = mock( NodeShardApproximation.class );
-//
-//
-// final TimeService timeService = mock( TimeService.class );
-//
-// final Keyspace keyspace = mock( Keyspace.class );
-//
-// final MutationBatch batch = mock( MutationBatch.class );
-//
-// when( keyspace.prepareMutationBatch() ).thenReturn( batch );
-//
-// NodeShardAllocation approximation =
-// new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
-// nodeShardCounterSerialization, timeService, graphFig, keyspace );
-//
-// final Id nodeId = createId( "test" );
-// final String type = "type";
-// final String subType = "subType";
-//
-// /**
-// * Mock up returning an empty iterator, our audit shouldn't create a new shard
-// */
-// when( edgeShardSerialization
-// .getShardMetaData( same( scope ), same( nodeId ), eq( NodeType.SOURCE ), any( Optional.class ),
-// same( type ), same( subType ) ) ).thenReturn( Collections.<Shard>emptyList().iterator() );
-//
-// final boolean result = approximation.auditMaxShard( scope, nodeId, NodeType.SOURCE, type, subType );
-//
-// assertFalse( "No shard allocated", result );
-// }
-//
-//
-// @Test
-// public void existingFutureShardSameTime() {
-// final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
-//
-// final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
-//
-// final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
-//
-// final NodeShardApproximation nodeShardCounterSerialization = mock( NodeShardApproximation.class );
-//
-//
-// final TimeService timeService = mock( TimeService.class );
-//
-// final Keyspace keyspace = mock( Keyspace.class );
-//
-//
-// final MutationBatch batch = mock( MutationBatch.class );
-//
-// when( keyspace.prepareMutationBatch() ).thenReturn( batch );
-//
-//
-// NodeShardAllocation approximation =
-// new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
-// nodeShardCounterSerialization, timeService, graphFig, keyspace );
-//
-// final Id nodeId = createId( "test" );
-// final String type = "type";
-// final String subType = "subType";
-//
-//
-// final long timeservicetime = System.currentTimeMillis();
-//
-// when( timeService.getCurrentTime() ).thenReturn( timeservicetime );
-//
-// final Shard futureShard = new Shard( 10000l, timeservicetime, compacted );
-//
-// /**
-// * Mock up returning a min shard, and a future shard
-// */
-// when( edgeShardSerialization
-// .getShardMetaData( same( scope ), same( nodeId ), eq( NodeType.TARGET ), any( Optional.class ),
-// same( type ), same( subType ) ) ).thenReturn( Arrays.asList( futureShard ).iterator() );
-//
-// final boolean result = approximation.auditMaxShard( scope, nodeId, NodeType.TARGET, type, subType );
-//
-// assertFalse( "No shard allocated", result );
-// }
-//
-//
-// @Test
-// public void lowCountFutureShard() {
-// final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
-//
-// final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
-//
-// final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
-//
-// final NodeShardApproximation nodeShardApproximation = mock( NodeShardApproximation.class );
-//
-//
-// final TimeService timeService = mock( TimeService.class );
-//
-// final Keyspace keyspace = mock( Keyspace.class );
-//
-// final MutationBatch batch = mock( MutationBatch.class );
-//
-// when( keyspace.prepareMutationBatch() ).thenReturn( batch );
-//
-//
-// NodeShardAllocation approximation =
-// new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
-// nodeShardApproximation, timeService, graphFig, keyspace );
-//
-// final Id nodeId = createId( "test" );
-// final String type = "type";
-// final String subType = "subType";
-//
-//
-// final long timeservicetime = System.currentTimeMillis();
-//
-// when( timeService.getCurrentTime() ).thenReturn( timeservicetime );
-//
-//
-// /**
-// * Mock up returning a min shard, and a future shard
-// */
-// when( edgeShardSerialization
-// .getShardMetaData( same( scope ), same( nodeId ), eq( NodeType.TARGET ), any( Optional.class ),
-// same( type ), same( subType ) ) ).thenReturn( Arrays.asList( new Shard( 0l, 0l, compacted ) ).iterator() );
-//
-//
-// //return a shard size < our max by 1
-//
-// final long count = graphFig.getShardSize() - 1;
-//
-// when( nodeShardApproximation.getCount( scope, nodeId, NodeType.TARGET, 0l, type, subType ) )
-// .thenReturn( count );
-//
-// final boolean result = approximation.auditMaxShard( scope, nodeId, NodeType.TARGET, type, subType );
-//
-// assertFalse( "Shard allocated", result );
-// }
-//
-//
-// @Test
-// public void equalCountFutureShard() {
-// final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
-//
-// final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
-//
-// final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
-//
-// final NodeShardApproximation nodeShardApproximation = mock( NodeShardApproximation.class );
-//
-//
-// final TimeService timeService = mock( TimeService.class );
-//
-// final Keyspace keyspace = mock( Keyspace.class );
-//
-// final MutationBatch batch = mock( MutationBatch.class );
-//
-// when( keyspace.prepareMutationBatch() ).thenReturn( batch );
-//
-//
-// NodeShardAllocation approximation =
-// new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
-// nodeShardApproximation, timeService, graphFig, keyspace );
-//
-// final Id nodeId = createId( "test" );
-// final String type = "type";
-// final String subType = "subType";
-//
-//
-// final long timeservicetime = System.currentTimeMillis();
-//
-// when( timeService.getCurrentTime() ).thenReturn( timeservicetime );
-//
-//
-// /**
-// * Mock up returning a min shard
-// */
-// when( edgeShardSerialization
-// .getShardMetaData( same( scope ), same( nodeId ), eq( NodeType.SOURCE ), any( Optional.class ),
-// same( type ), same( subType ) ) ).thenReturn( Arrays.asList( new Shard( 0l, 0l, compacted ) ).iterator() );
-//
-//
-// final long shardCount = graphFig.getShardSize();
-//
-// //return a shard size equal to our max
-// when( nodeShardApproximation.getCount( scope, nodeId, NodeType.SOURCE, 0l, type, subType ) )
-// .thenReturn( shardCount );
-//
-// ArgumentCaptor<Long> shardValue = ArgumentCaptor.forClass( Long.class );
-// ArgumentCaptor<Long> timestampValue = ArgumentCaptor.forClass( Long.class );
-//
-//
-// //mock up our mutation
-// when( edgeShardSerialization
-// .writeShardMeta( same( scope ), same( nodeId ), eq( NodeType.SOURCE ), shardValue.capture(),
-// timestampValue.capture(), same( type ), same( subType ) ) )
-// .thenReturn( mock( MutationBatch.class ) );
-//
-//
-// final SimpleMarkedEdge returnedEdge =
-// new SimpleMarkedEdge( nodeId, type, createId( "subType" ), 10005l, false );
-// final Iterator<MarkedEdge> edgeIterator = Collections.singleton( ( MarkedEdge ) returnedEdge ).iterator();
-//
-// //mock up returning the value
-// when( shardedEdgeSerialization
-// .getEdgesFromSourceByTargetType( same( edgeColumnFamilies ), same( scope ), any( SearchByIdType.class ),
-// any( Iterator.class ) ) ).thenReturn( edgeIterator );
-//
-//
-// final boolean result = approximation.auditMaxShard( scope, nodeId, NodeType.SOURCE, type, subType );
-//
-// assertTrue( "Shard allocated", result );
-//
-// //check our new allocated UUID
-//
-//
-// final long savedTimestamp = timestampValue.getValue();
-//
-//
-// assertEquals( "Expected time service time", timeservicetime, savedTimestamp );
-//
-//
-// //now check our max value was set
-//
-// final long savedShardPivot = shardValue.getValue();
-//
-// assertEquals( "Expected max value to be the same", returnedEdge.getTimestamp(), savedShardPivot );
-// }
-//
-//
-// @Test
-// public void futureCountShardCleanup() {
-// final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
-//
-// final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
-//
-// final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
-//
-// final NodeShardApproximation nodeShardApproximation = mock( NodeShardApproximation.class );
-//
-//
-// final TimeService timeService = mock( TimeService.class );
-//
-// final Keyspace keyspace = mock( Keyspace.class );
-//
-// final MutationBatch batch = mock( MutationBatch.class );
-//
-// when( keyspace.prepareMutationBatch() ).thenReturn( batch );
-//
-//
-// NodeShardAllocation approximation =
-// new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
-// nodeShardApproximation, timeService, graphFig, keyspace );
-//
-// final Id nodeId = createId( "test" );
-// final String type = "type";
-// final String subType = "subType";
-//
-//
-// /**
-// * Use the time service to generate timestamps
-// */
-// final long timeservicetime = 10000;
-//
-//
-// when( timeService.getCurrentTime() ).thenReturn( timeservicetime );
-//
-// assertTrue( "Shard cache mocked", graphFig.getShardCacheTimeout() > 0 );
-//
-//
-// /**
-// * Simulates clock drift when 2 nodes create future shards near one another
-// */
-// final long minDelta = graphFig.getShardMinDelta();
-//
-//
-// final Shard minShard = new Shard( 0l, 0l, compacted );
-//
-// //a shard that isn't our minimum, but exists after compaction
-// final Shard compactedShard = new Shard( 5000, 1000, compacted );
-//
-// /**
-// * Simulate different node time allocation
-// */
-//
-// final long minTime = 10000;
-// //our second shard is the "oldest", and hence should be returned in the iterator. Future shard 1 and 3
-// // should be removed
-//
-// //this should get dropped, It's allocated after future shard2 even though the time is less
-// final Shard futureShard1 = new Shard( 10000, minTime + minDelta, compacted );
-//
-// //should get kept.
-// final Shard futureShard2 = new Shard( 10005, minTime, compacted );
-//
-// //should be removed
-// final Shard futureShard3 = new Shard( 10010, minTime + minDelta / 2, compacted );
-//
-// /**
-// * Mock up returning a min shard
-// */
-// when( edgeShardSerialization
-// .getShardMetaData( same( scope ), same( nodeId ), eq( NodeType.TARGET ), any( Optional.class ),
-// same( type ), same( subType ) ) ).thenReturn(
-// Arrays.asList( futureShard3, futureShard2, futureShard1, compactedShard, minShard ).iterator() );
-//
-//
-// ArgumentCaptor<Long> newLongValue = ArgumentCaptor.forClass( Long.class );
-//
-//
-// //mock up our mutation
-// when( edgeShardSerialization
-// .removeShardMeta( same( scope ), same( nodeId ), eq( NodeType.TARGET ), newLongValue.capture(),
-// same( type ), same( subType ) ) ).thenReturn( mock( MutationBatch.class ) );
-//
-//
-// final Iterator<ShardEntryGroup> result =
-// approximation.getShards( scope, nodeId, NodeType.TARGET, Optional.<Shard>absent(), type, subType );
-//
-//
-// assertTrue( "Shards present", result.hasNext() );
-//
-//
-// ShardEntryGroup shardEntryGroup = result.next();
-//
-// assertEquals( "Future shard returned", futureShard1, shardEntryGroup.getCompactionTarget() );
-//
-//
-// //now verify all 4 are in this group. This is because the first shard (0,0) (n-1_ may be the only shard other
-// //nodes see while we're rolling our state. This means it should be read and merged from as well
-//
-// Collection<Shard> writeShards = shardEntryGroup.getWriteShards();
-//
-// assertEquals( "Shard size as expected", 4, writeShards.size() );
-//
-// assertTrue( writeShards.contains( futureShard1 ) );
-// assertTrue( writeShards.contains( futureShard2 ) );
-// assertTrue( writeShards.contains( futureShard3 ) );
-// assertTrue( writeShards.contains( compactedShard ) );
-//
-//
-// Collection<Shard> readShards = shardEntryGroup.getReadShards( minTime + minDelta );
-//
-// assertEquals( "Shard size as expected", 4, readShards.size() );
-//
-// assertTrue( readShards.contains( futureShard1 ) );
-// assertTrue( readShards.contains( futureShard2 ) );
-// assertTrue( readShards.contains( futureShard3 ) );
-// assertTrue( readShards.contains( compactedShard ) );
-//
-//
-// assertTrue( "Shards present", result.hasNext() );
-//
-// shardEntryGroup = result.next();
-//
-// writeShards = shardEntryGroup.getWriteShards();
-//
-//
-// assertTrue( "Previous shard present", writeShards.contains( minShard ) );
-//
-//
-// writeShards = shardEntryGroup.getReadShards( minTime + minDelta );
-//
-//
-// assertTrue( "Previous shard present", writeShards.contains( minShard ) );
-//
-//
-// assertFalse( "No shards left", result.hasNext() );
-// }
-//
-//
-// @Test
-// public void noShardsReturns() {
-// final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
-//
-// final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
-//
-// final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
-//
-// final NodeShardApproximation nodeShardApproximation = mock( NodeShardApproximation.class );
-//
-//
-// final TimeService timeService = mock( TimeService.class );
-//
-// when( timeService.getCurrentTime() ).thenReturn( 10000l );
-//
-// final Keyspace keyspace = mock( Keyspace.class );
-//
-// final MutationBatch batch = mock( MutationBatch.class );
-//
-// when( keyspace.prepareMutationBatch() ).thenReturn( batch );
-//
-// NodeShardAllocation approximation =
-// new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
-// nodeShardApproximation, timeService, graphFig, keyspace );
-//
-// final Id nodeId = createId( "test" );
-// final String type = "type";
-// final String subType = "subType";
-//
-// /**
-// * Mock up returning an empty iterator, our audit shouldn't create a new shard
-// */
-// when( edgeShardSerialization
-// .getShardMetaData( same( scope ), same( nodeId ), eq( NodeType.TARGET ), any( Optional.class ),
-// same( type ), same( subType ) ) ).thenReturn( Collections.<Shard>emptyList().iterator() );
-//
-// final Iterator<ShardEntryGroup> result =
-// approximation.getShards( scope, nodeId, NodeType.TARGET, Optional.<Shard>absent(), type, subType );
-//
-//
-// ShardEntryGroup shardEntryGroup = result.next();
-//
-// final Shard expected = new Shard( 0, 0, compacted );
-//
-// assertEquals( "Future shard returned", expected, shardEntryGroup.getCompactionTarget() );
-//
-//
-// //now verify all 4 are in this group. This is because the first shard (0,0) (n-1_ may be the only shard other
-// //nodes see while we're rolling our state. This means it should be read and merged from as well
-//
-// Collection<Shard> writeShards = shardEntryGroup.getWriteShards();
-//
-// Collection<Shard> readShards = shardEntryGroup.getReadShards( 10000l );
-//
-//
-// assertTrue( "0 shard allocated", writeShards.contains( expected ) );
-//
-// assertTrue( "0 shard allocated", readShards.contains( expected ) );
-//
-//
-// assertFalse( "No shard allocated", result.hasNext() );
-// }
-//
-//
-// @Test
-// public void invalidConfiguration() {
-//
-// final GraphFig graphFig = mock( GraphFig.class );
-//
-// final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
-//
-// final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
-//
-// final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
-//
-// final NodeShardApproximation nodeShardApproximation = mock( NodeShardApproximation.class );
-//
-//
-// /**
-// * Return 100000 milliseconds
-// */
-// final TimeService timeService = mock( TimeService.class );
-//
-// final long time = 100000l;
-//
-// when( timeService.getCurrentTime() ).thenReturn( time );
-//
-//
-// final long cacheTimeout = 30000l;
-//
-// when( graphFig.getShardCacheTimeout() ).thenReturn( 30000l );
-//
-//
-// final long tooSmallDelta = ( long ) ( ( cacheTimeout * 2 ) * .99 );
-//
-// when( graphFig.getShardMinDelta() ).thenReturn( tooSmallDelta );
-//
-//
-// final Keyspace keyspace = mock( Keyspace.class );
-//
-// final MutationBatch batch = mock( MutationBatch.class );
-//
-// when( keyspace.prepareMutationBatch() ).thenReturn( batch );
-//
-//
-// NodeShardAllocation approximation =
-// new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
-// nodeShardApproximation, timeService, graphFig, keyspace );
-//
-//
-// /**
-// * Should throw an exception
-// */
-// try {
-// approximation.getMinTime();
-// fail( "Should have thrown a GraphRuntimeException" );
-// }
-// catch ( GraphRuntimeException gre ) {
-// //swallow
-// }
-//
-// //now test something that passes.
-//
-// final long minDelta = cacheTimeout * 2;
-//
-// when( graphFig.getShardMinDelta() ).thenReturn( minDelta );
-//
-// long returned = approximation.getMinTime();
-//
-// long expectedReturned = time - minDelta;
-//
-// assertEquals( expectedReturned, returned );
-//
-// final long delta = cacheTimeout * 4;
-//
-// when( graphFig.getShardMinDelta() ).thenReturn( delta );
-//
-// returned = approximation.getMinTime();
-//
-// expectedReturned = time - delta;
-//
-// assertEquals( expectedReturned, returned );
-// }
+ @Test
+ public void minTime() {
+ final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
+
+ final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
+
+ final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
+
+ final NodeShardApproximation nodeShardCounterSerialization = mock( NodeShardApproximation.class );
+
+
+ final TimeService timeService = mock( TimeService.class );
+
+ final Keyspace keyspace = mock( Keyspace.class );
+
+
+ NodeShardAllocation approximation =
+ new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
+ nodeShardCounterSerialization, timeService, graphFig, keyspace );
+
+
+ final long timeservicetime = System.currentTimeMillis();
+
+ when( timeService.getCurrentTime() ).thenReturn( timeservicetime );
+
+ final long expected = timeservicetime - 2 * graphFig.getShardCacheTimeout();
+
+ final long returned = approximation.getMinTime();
+
+ assertEquals( "Correct time was returned", expected, returned );
+ }
+
+
+ @Test
+ public void noShards() {
+ final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
+
+ final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
+
+ final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
+
+
+ final NodeShardApproximation nodeShardCounterSerialization = mock( NodeShardApproximation.class );
+
+
+ final TimeService timeService = mock( TimeService.class );
+
+ final Keyspace keyspace = mock( Keyspace.class );
+
+ final MutationBatch batch = mock( MutationBatch.class );
+
+ when( keyspace.prepareMutationBatch() ).thenReturn( batch );
+
+ NodeShardAllocation approximation =
+ new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
+ nodeShardCounterSerialization, timeService, graphFig, keyspace );
+
+ final Id nodeId = createId( "test" );
+ final String type = "type";
+ final String subType = "subType";
+
+ /**
+ * Mock up returning an empty iterator, our audit shouldn't create a new shard
+ */
+ when( edgeShardSerialization
+ .getShardMetaData( same( scope ), same( nodeId ), eq( NodeType.SOURCE ), any( Optional.class ),
+ same( type ), same( subType ) ) ).thenReturn( Collections.<Shard>emptyList().iterator() );
+
+ final boolean result = approximation.auditMaxShard( scope, nodeId, NodeType.SOURCE, type, subType );
+
+ assertFalse( "No shard allocated", result );
+ }
+
+
+ @Test
+ public void existingFutureShardSameTime() {
+ final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
+
+ final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
+
+ final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
+
+ final NodeShardApproximation nodeShardCounterSerialization = mock( NodeShardApproximation.class );
+
+
+ final TimeService timeService = mock( TimeService.class );
+
+ final Keyspace keyspace = mock( Keyspace.class );
+
+
+ final MutationBatch batch = mock( MutationBatch.class );
+
+ when( keyspace.prepareMutationBatch() ).thenReturn( batch );
+
+
+ NodeShardAllocation approximation =
+ new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
+ nodeShardCounterSerialization, timeService, graphFig, keyspace );
+
+ final Id nodeId = createId( "test" );
+ final String type = "type";
+ final String subType = "subType";
+
+
+ final long timeservicetime = System.currentTimeMillis();
+
+ when( timeService.getCurrentTime() ).thenReturn( timeservicetime );
+
+ final Shard futureShard = new Shard( 10000l, timeservicetime, true );
+
+ /**
+ * Mock up returning a min shard, and a future shard
+ */
+ when( edgeShardSerialization
+ .getShardMetaData( same( scope ), same( nodeId ), eq( NodeType.TARGET ), any( Optional.class ),
+ same( type ), same( subType ) ) ).thenReturn( Arrays.asList( futureShard ).iterator() );
+
+ final boolean result = approximation.auditMaxShard( scope, nodeId, NodeType.TARGET, type, subType );
+
+ assertFalse( "No shard allocated", result );
+ }
+
+
+ @Test
+ public void lowCountFutureShard() {
+ final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
+
+ final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
+
+ final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
+
+ final NodeShardApproximation nodeShardApproximation = mock( NodeShardApproximation.class );
+
+
+ final TimeService timeService = mock( TimeService.class );
+
+ final Keyspace keyspace = mock( Keyspace.class );
+
+ final MutationBatch batch = mock( MutationBatch.class );
+
+ when( keyspace.prepareMutationBatch() ).thenReturn( batch );
+
+
+ NodeShardAllocation approximation =
+ new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
+ nodeShardApproximation, timeService, graphFig, keyspace );
+
+ final Id nodeId = createId( "test" );
+ final String type = "type";
+ final String subType = "subType";
+
+
+ final long timeservicetime = System.currentTimeMillis();
+
+ when( timeService.getCurrentTime() ).thenReturn( timeservicetime );
+
+
+ /**
+ * Mock up returning a min shard, and a future shard
+ */
+ when( edgeShardSerialization
+ .getShardMetaData( same( scope ), same( nodeId ), eq( NodeType.TARGET ), any( Optional.class ),
+ same( type ), same( subType ) ) ).thenReturn( Arrays.asList( new Shard( 0l, 0l, true ) ).iterator() );
+
+
+ //return a shard size < our max by 1
+
+ final long count = graphFig.getShardSize() - 1;
+
+ when( nodeShardApproximation.getCount( scope, nodeId, NodeType.TARGET, 0l, type, subType ) )
+ .thenReturn( count );
+
+ final boolean result = approximation.auditMaxShard( scope, nodeId, NodeType.TARGET, type, subType );
+
+ assertFalse( "Shard allocated", result );
+ }
+
+
+ @Test
+ public void equalCountFutureShard() {
+ final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
+
+ final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
+
+ final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
+
+ final NodeShardApproximation nodeShardApproximation = mock( NodeShardApproximation.class );
+
+
+ final TimeService timeService = mock( TimeService.class );
+
+ final Keyspace keyspace = mock( Keyspace.class );
+
+ final MutationBatch batch = mock( MutationBatch.class );
+
+ when( keyspace.prepareMutationBatch() ).thenReturn( batch );
+
+
+ NodeShardAllocation approximation =
+ new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
+ nodeShardApproximation, timeService, graphFig, keyspace );
+
+ final Id nodeId = createId( "test" );
+ final String type = "type";
+ final String subType = "subType";
+
+
+ final long timeservicetime = System.currentTimeMillis();
+
+ when( timeService.getCurrentTime() ).thenReturn( timeservicetime );
+
+
+ /**
+ * Mock up returning a min shard
+ */
+ when( edgeShardSerialization
+ .getShardMetaData( same( scope ), same( nodeId ), eq( NodeType.SOURCE ), any( Optional.class ),
+ same( type ), same( subType ) ) ).thenReturn( Arrays.asList( new Shard( 0l, 0l, true ) ).iterator() );
+
+
+ final long shardCount = graphFig.getShardSize();
+
+ //return a shard size equal to our max
+ when( nodeShardApproximation.getCount( scope, nodeId, NodeType.SOURCE, 0l, type, subType ) )
+ .thenReturn( shardCount );
+
+ ArgumentCaptor<Shard> shardValue = ArgumentCaptor.forClass( Shard.class );
+
+
+ //mock up our mutation
+ when( edgeShardSerialization
+ .writeShardMeta( same( scope ), same( nodeId ), eq( NodeType.SOURCE ), shardValue.capture(), same( type ), same( subType ) ) )
+ .thenReturn( mock( MutationBatch.class ) );
+
+
+ final SimpleMarkedEdge returnedEdge =
+ new SimpleMarkedEdge( nodeId, type, createId( "subType" ), 10005l, false );
+ final Iterator<MarkedEdge> edgeIterator = Collections.singleton( ( MarkedEdge ) returnedEdge ).iterator();
+
+ //mock up returning the value
+ when( shardedEdgeSerialization
+ .getEdgesFromSourceByTargetType( same( edgeColumnFamilies ), same( scope ), any( SearchByIdType.class ),
+ any( Iterator.class ) ) ).thenReturn( edgeIterator );
+
+
+ final boolean result = approximation.auditMaxShard( scope, nodeId, NodeType.SOURCE, type, subType );
+
+ assertTrue( "Shard allocated", result );
+
+ //check our new allocated UUID
+
+
+ final long savedTimestamp = shardValue.getValue().getCreatedTime();
+
+
+ assertEquals( "Expected time service time", timeservicetime, savedTimestamp );
+
+
+ //now check our max value was set
+
+ final long savedShardPivot = shardValue.getValue().getShardIndex();
+
+ assertEquals( "Expected max value to be the same", returnedEdge.getTimestamp(), savedShardPivot );
+ }
+
+
+ @Test
+ public void futureCountShardCleanup() {
+ final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
+
+ final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
+
+ final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
+
+ final NodeShardApproximation nodeShardApproximation = mock( NodeShardApproximation.class );
+
+
+ final TimeService timeService = mock( TimeService.class );
+
+ final Keyspace keyspace = mock( Keyspace.class );
+
+ final MutationBatch batch = mock( MutationBatch.class );
+
+ when( keyspace.prepareMutationBatch() ).thenReturn( batch );
+
+
+ NodeShardAllocation approximation =
+ new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
+ nodeShardApproximation, timeService, graphFig, keyspace );
+
+ final Id nodeId = createId( "test" );
+ final String type = "type";
+ final String subType = "subType";
+
+
+ /**
+ * Use the time service to generate timestamps
+ */
+ final long timeservicetime = 10000;
+
+
+ when( timeService.getCurrentTime() ).thenReturn( timeservicetime );
+
+ assertTrue( "Shard cache mocked", graphFig.getShardCacheTimeout() > 0 );
+
+
+ /**
+ * Simulates clock drift when 2 nodes create future shards near one another
+ */
+ final long minDelta = graphFig.getShardMinDelta();
+
+
+ final Shard minShard = new Shard( 0l, 0l, true );
+
+ //a shard that isn't our minimum, but exists after compaction
+ final Shard compactedShard = new Shard( 5000, 1000, true );
+
+ /**
+ * Simulate different node time allocation
+ */
+
+ final long minTime = 10000;
+ //our second shard is the "oldest", and hence should be returned in the iterator. Future shard 1 and 3
+ // should be removed
+
+ //this should get dropped, It's allocated after future shard2 even though the time is less
+ final Shard futureShard1 = new Shard( 10000, minTime + minDelta, false );
+
+ //should get kept.
+ final Shard futureShard2 = new Shard( 10005, minTime, false );
+
+ //should be removed
+ final Shard futureShard3 = new Shard( 10010, minTime + minDelta / 2, false );
+
+ /**
+ * Mock up returning a min shard
+ */
+ when( edgeShardSerialization
+ .getShardMetaData( same( scope ), same( nodeId ), eq( NodeType.TARGET ), any( Optional.class ),
+ same( type ), same( subType ) ) ).thenReturn(
+ Arrays.asList( futureShard3, futureShard2, futureShard1, compactedShard, minShard ).iterator() );
+
+
+ ArgumentCaptor<Shard> newLongValue = ArgumentCaptor.forClass( Shard.class );
+
+
+ //mock up our mutation
+ when( edgeShardSerialization
+ .removeShardMeta( same( scope ), same( nodeId ), eq( NodeType.TARGET ), newLongValue.capture(),
+ same( type ), same( subType ) ) ).thenReturn( mock( MutationBatch.class ) );
+
+
+ final Iterator<ShardEntryGroup> result =
+ approximation.getShards( scope, nodeId, NodeType.TARGET, Optional.<Shard>absent(), type, subType );
+
+
+ assertTrue( "Shards present", result.hasNext() );
+
+
+ ShardEntryGroup shardEntryGroup = result.next();
+
+ assertEquals( "Future shard returned", futureShard1, shardEntryGroup.getCompactionTarget() );
+
+
+ //now verify all 4 are in this group. This is because the first shard (0,0) (n-1_ may be the only shard other
+ //nodes see while we're rolling our state. This means it should be read and merged from as well
+
+ Collection<Shard> writeShards = shardEntryGroup.getWriteShards(minTime + minDelta);
+
+ assertEquals( "Shard size as expected", 4, writeShards.size() );
+
+ assertTrue( writeShards.contains( futureShard1 ) );
+ assertTrue( writeShards.contains( futureShard2 ) );
+ assertTrue( writeShards.contains( futureShard3 ) );
+ assertTrue( writeShards.contains( compactedShard ) );
+
+
+ Collection<Shard> readShards = shardEntryGroup.getReadShards( );
+
+ assertEquals( "Shard size as expected", 4, readShards.size() );
+
+ assertTrue( readShards.contains( futureShard1 ) );
+ assertTrue( readShards.contains( futureShard2 ) );
+ assertTrue( readShards.contains( futureShard3 ) );
+ assertTrue( readShards.contains( compactedShard ) );
+
+
+ assertTrue( "Shards present", result.hasNext() );
+
+ shardEntryGroup = result.next();
+
+
+ writeShards = shardEntryGroup.getWriteShards(minTime + minDelta);
+
+
+ assertTrue( "Previous shard present", writeShards.contains( minShard ) );
+
+
+ writeShards = shardEntryGroup.getReadShards();
+
+
+ assertTrue( "Previous shard present", writeShards.contains( minShard ) );
+
+
+ assertFalse( "No shards left", result.hasNext() );
+ }
+
+
+ @Test
+ public void noShardsReturns() {
+ final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
+
+ final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
+
+ final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
+
+ final NodeShardApproximation nodeShardApproximation = mock( NodeShardApproximation.class );
+
+
+ final TimeService timeService = mock( TimeService.class );
+
+ when( timeService.getCurrentTime() ).thenReturn( 10000l );
+
+ final Keyspace keyspace = mock( Keyspace.class );
+
+ final MutationBatch batch = mock( MutationBatch.class );
+
+ when( keyspace.prepareMutationBatch() ).thenReturn( batch );
+
+ NodeShardAllocation approximation =
+ new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
+ nodeShardApproximation, timeService, graphFig, keyspace );
+
+ final Id nodeId = createId( "test" );
+ final String type = "type";
+ final String subType = "subType";
+
+ /**
+ * Mock up returning an empty iterator, our audit shouldn't create a new shard
+ */
+ when( edgeShardSerialization
+ .getShardMetaData( same( scope ), same( nodeId ), eq( NodeType.TARGET ), any( Optional.class ),
+ same( type ), same( subType ) ) ).thenReturn( Collections.<Shard>emptyList().iterator() );
+
+ final Iterator<ShardEntryGroup> result =
+ approximation.getShards( scope, nodeId, NodeType.TARGET, Optional.<Shard>absent(), type, subType );
+
+
+ ShardEntryGroup shardEntryGroup = result.next();
+
+ final Shard expected = new Shard( 0, 0, false );
+
+ assertEquals( "Future shard returned", expected, shardEntryGroup.getCompactionTarget() );
+
+
+ //now verify all 4 are in this group. This is because the first shard (0,0) (n-1_ may be the only shard other
+ //nodes see while we're rolling our state. This means it should be read and merged from as well
+
+ Collection<Shard> writeShards = shardEntryGroup.getWriteShards(timeService.getCurrentTime());
+
+ Collection<Shard> readShards = shardEntryGroup.getReadShards( );
+
+
+ assertTrue( "0 shard allocated", writeShards.contains( expected ) );
+
+ assertTrue( "0 shard allocated", readShards.contains( expected ) );
+
+
+ assertFalse( "No shard allocated", result.hasNext() );
+ }
+
+
+ @Test
+ public void invalidConfiguration() {
+
+ final GraphFig graphFig = mock( GraphFig.class );
+
+ final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
+
+ final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
+
+ final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
+
+ final NodeShardApproximation nodeShardApproximation = mock( NodeShardApproximation.class );
+
+
+ /**
+ * Return 100000 milliseconds
+ */
+ final TimeService timeService = mock( TimeService.class );
+
+ final long time = 100000l;
+
+ when( timeService.getCurrentTime() ).thenReturn( time );
+
+
+ final long cacheTimeout = 30000l;
+
+ when( graphFig.getShardCacheTimeout() ).thenReturn( 30000l );
+
+
+ final long tooSmallDelta = ( long ) ( ( cacheTimeout * 2 ) * .99 );
+
+ when( graphFig.getShardMinDelta() ).thenReturn( tooSmallDelta );
+
+
+ final Keyspace keyspace = mock( Keyspace.class );
+
+ final MutationBatch batch = mock( MutationBatch.class );
+
+ when( keyspace.prepareMutationBatch() ).thenReturn( batch );
+
+
+ NodeShardAllocation approximation =
+ new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
+ nodeShardApproximation, timeService, graphFig, keyspace );
+
+
+ /**
+ * Should throw an exception
+ */
+ try {
+ approximation.getMinTime();
+ fail( "Should have thrown a GraphRuntimeException" );
+ }
+ catch ( GraphRuntimeException gre ) {
+ //swallow
+ }
+
+ //now test something that passes.
+
+ final long minDelta = cacheTimeout * 2;
+
+ when( graphFig.getShardMinDelta() ).thenReturn( minDelta );
+
+ long returned = approximation.getMinTime();
+
+ long expectedReturned = time - minDelta;
+
+ assertEquals( expectedReturned, returned );
+
+ final long delta = cacheTimeout * 4;
+
+ when( graphFig.getShardMinDelta() ).thenReturn( delta );
+
+ returned = approximation.getMinTime();
+
+ expectedReturned = time - delta;
+
+ assertEquals( expectedReturned, returned );
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9fa368e/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroupTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroupTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroupTest.java
index a898e5c..b08da9a 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroupTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroupTest.java
@@ -19,6 +19,9 @@
package org.apache.usergrid.persistence.graph.serialization.impl.shard;
+import java.util.Collection;
+import java.util.Set;
+
import org.junit.Test;
import static org.junit.Assert.assertEquals;
@@ -65,11 +68,11 @@ public class ShardEntryGroupTest {
ShardEntryGroup shardEntryGroup = new ShardEntryGroup( delta );
- boolean result = shardEntryGroup.addShard( firstShard );
+ boolean result = shardEntryGroup.addShard( secondShard );
assertTrue( "Shard added", result );
- result = shardEntryGroup.addShard( secondShard );
+ result = shardEntryGroup.addShard( firstShard );
assertTrue( " Shard added", result );
@@ -102,7 +105,7 @@ public class ShardEntryGroupTest {
ShardEntryGroup shardEntryGroup = new ShardEntryGroup( delta );
- boolean result = shardEntryGroup.addShard( compactedShard );
+ boolean result = shardEntryGroup.addShard( secondShard );
assertTrue( "Shard added", result );
@@ -111,7 +114,7 @@ public class ShardEntryGroupTest {
assertTrue( "Shard added", result );
- result = shardEntryGroup.addShard( secondShard );
+ result = shardEntryGroup.addShard( compactedShard );
assertTrue( " Shard added", result );
@@ -224,10 +227,195 @@ public class ShardEntryGroupTest {
shardEntryGroup.shouldCompact( secondShard.getCreatedTime() + delta ) );
- assertFalse( "Merge cannot be run within min time", shardEntryGroup.shouldCompact( secondShard.getCreatedTime() + delta + 1 ) );
+ assertFalse( "Merge cannot be run within min time",
+ shardEntryGroup.shouldCompact( secondShard.getCreatedTime() + delta + 1 ) );
assertTrue( "Merge should be run with after min time", shardEntryGroup.shouldCompact( firstShard.getCreatedTime() + delta + 1 ) );
}
+
+
+ /**
+ * Ensures that we read from all shards (even the compacted one)
+ */
+ @Test
+ public void getAllReadShards() {
+
+ final long delta = 10000;
+
+ Shard firstShard = new Shard( 1000, 10000, false );
+
+ Shard secondShard = new Shard( 999, 9000, false );
+
+ Shard compactedShard1 = new Shard( 900, 8000, true );
+
+
+ ShardEntryGroup shardEntryGroup = new ShardEntryGroup( delta );
+
+ boolean result = shardEntryGroup.addShard( firstShard );
+
+ assertTrue( "Shard added", result );
+
+ result = shardEntryGroup.addShard( secondShard );
+
+ assertTrue( " Shard added", result );
+
+ result = shardEntryGroup.addShard( compactedShard1 );
+
+ assertTrue( "Shard added", result );
+
+ Collection<Shard> readShards = shardEntryGroup.getReadShards();
+
+ assertEquals("Shard size correct", 3, readShards.size());
+
+ assertTrue("First shard present", readShards.contains( firstShard ) );
+
+ assertTrue("Second shard present", readShards.contains( firstShard ) );
+
+ assertTrue("Third shard present", readShards.contains( firstShard ) );
+
+ }
+
+
+ /**
+ * Ensures that we read from all shards (even the compacted one)
+ */
+ @Test
+ public void getAllWriteShardsNotPastCompaction() {
+
+ final long delta = 10000;
+
+ Shard firstShard = new Shard( 1000, 10000, false );
+
+ Shard secondShard = new Shard( 999, 9000, false );
+
+ Shard compactedShard = new Shard( 900, 8000, true );
+
+
+ ShardEntryGroup shardEntryGroup = new ShardEntryGroup( delta );
+
+ boolean result = shardEntryGroup.addShard( firstShard );
+
+ assertTrue( "Shard added", result );
+
+ result = shardEntryGroup.addShard( secondShard );
+
+ assertTrue( " Shard added", result );
+
+ result = shardEntryGroup.addShard( compactedShard );
+
+ assertTrue( "Shard added", result );
+
+
+
+ Collection<Shard> writeShards = shardEntryGroup.getWriteShards( firstShard.getCreatedTime() + delta );
+
+ assertEquals("Shard size correct", 3, writeShards.size());
+
+ assertTrue("First shard present", writeShards.contains( firstShard ) );
+
+ assertTrue("Second shard present", writeShards.contains( secondShard ) );
+
+ assertTrue("Third shard present", writeShards.contains( compactedShard ) );
+
+
+
+ writeShards = shardEntryGroup.getWriteShards(secondShard.getCreatedTime()+delta);
+
+ assertEquals("Shard size correct", 3, writeShards.size());
+
+ assertTrue("First shard present", writeShards.contains( firstShard ) );
+
+ assertTrue("Second shard present", writeShards.contains( secondShard ) );
+
+ assertTrue("Third shard present", writeShards.contains( compactedShard ) );
+
+
+ /**
+ * Not the max created timestamp, shouldn't return less than all shards
+ */
+ writeShards = shardEntryGroup.getWriteShards(secondShard.getCreatedTime() +1 + delta);
+
+ assertEquals("Shard size correct", 3, writeShards.size());
+
+ assertTrue("First shard present", writeShards.contains( firstShard ) );
+
+ assertTrue("Second shard present", writeShards.contains( secondShard ) );
+
+ assertTrue("Third shard present", writeShards.contains( compactedShard ) );
+
+
+
+ assertEquals("Compaction target correct", secondShard, shardEntryGroup.getCompactionTarget());
+
+ writeShards = shardEntryGroup.getWriteShards(firstShard.getCreatedTime() +1 + delta);
+
+ assertEquals("Shard size correct", 1, writeShards.size());
+
+
+ assertTrue("Second shard present", writeShards.contains( secondShard ) );
+
+ }
+
+
+ @Test(expected=IllegalArgumentException.class)
+ public void failsInsertionOrder() {
+
+ final long delta = 10000;
+
+ Shard secondShard = new Shard(20000, 10000, false);
+
+ Shard firstShard = new Shard(10000 , 10000, false );
+
+ Shard rootShard = new Shard( 0, 0, false );
+
+ ShardEntryGroup shardEntryGroup = new ShardEntryGroup( delta );
+
+ boolean result = shardEntryGroup.addShard( secondShard );
+
+ assertTrue( "Shard added", result );
+
+ result = shardEntryGroup.addShard( rootShard );
+
+ assertTrue( "Shard added", result );
+
+ //this should blow up, we can't add a shard in the middle, it must always be greater than the current max
+
+ shardEntryGroup.addShard( firstShard );
+
+
+ }
+
+
+
+ @Test
+ public void shardEntryAddList() {
+
+ final long delta = 10000;
+
+ Shard highShard = new Shard( 30000, 1000, false );
+
+ Shard midShard = new Shard( 20000, 1000, true );
+
+ Shard lowShard = new Shard( 10000, 1000, false);
+
+ ShardEntryGroup shardEntryGroup = new ShardEntryGroup( delta );
+
+ boolean result = shardEntryGroup.addShard( highShard );
+
+ assertTrue( "Shard added", result );
+
+ result = shardEntryGroup.addShard( midShard );
+
+ assertTrue( "Shard added", result );
+
+ result = shardEntryGroup.addShard( lowShard );
+
+ assertFalse( "Shard added", result );
+ }
+
+
+
+
}