You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2014/08/05 00:14:57 UTC
[04/11] git commit: Checkpoint
Checkpoint
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/7425ba5d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/7425ba5d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/7425ba5d
Branch: refs/heads/USERGRID-188
Commit: 7425ba5df6b79c40c7632824b42e1444b5c91780
Parents: 72f0d7d
Author: Todd Nine <tn...@apigee.com>
Authored: Mon Jul 14 15:01:37 2014 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon Jul 14 15:01:37 2014 -0600
----------------------------------------------------------------------
.../usergrid/persistence/graph/GraphFig.java | 10 +-
.../impl/EdgeSerializationImpl.java | 37 ++--
.../impl/shard/NodeShardApproximation.java | 11 +-
.../serialization/impl/shard/count/Counter.java | 8 +-
.../shard/count/NodeShardApproximationImpl.java | 169 ++++++++++++-------
.../impl/shard/count/ShardKey.java | 56 +++---
.../shard/impl/EdgeShardSerializationImpl.java | 2 +-
.../graph/serialization/util/EdgeHasher.java | 86 ++++++++++
.../graph/GraphManagerShardingIT.java | 13 +-
.../impl/shard/EdgeShardSerializationTest.java | 39 +++--
.../impl/shard/NodeShardAllocationTest.java | 36 ++--
.../shard/count/NodeShardApproximationTest.java | 79 +++++----
.../NodeShardCounterSerializationTest.java | 7 +-
13 files changed, 382 insertions(+), 171 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7425ba5d/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
index e0ce45c..f84fef4 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
@@ -41,9 +41,11 @@ public interface GraphFig extends GuicyFig {
public static final String SHARD_CACHE_TIMEOUT = "usergrid.graph.shard.cache.timeout";
- public static final String COUNTER_WRITE_FLUSH_COUNT = "usergrid.graph.shard.counter.flush.count";
+ public static final String COUNTER_WRITE_FLUSH_COUNT = "usergrid.graph.shard.counter.beginFlush.count";
- public static final String COUNTER_WRITE_FLUSH_INTERVAL = "usergrid.graph.shard.counter.flush.interval";
+ public static final String COUNTER_WRITE_FLUSH_INTERVAL = "usergrid.graph.shard.counter.beginFlush.interval";
+
+ public static final String COUNTER_WRITE_FLUSH_QUEUE_SIZE = "usergrid.graph.shard.counter.queue.size";
@@ -82,5 +84,9 @@ public interface GraphFig extends GuicyFig {
@Default( "30000" )
@Key( COUNTER_WRITE_FLUSH_INTERVAL )
long getCounterFlushInterval();
+
+ @Default( "1000" )
+ @Key(COUNTER_WRITE_FLUSH_QUEUE_SIZE )
+ int getCounterFlushQueueSize();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7425ba5d/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
index 7c630c1..e49b99d 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
@@ -55,6 +55,7 @@ import org.apache.usergrid.persistence.graph.SearchByIdType;
import org.apache.usergrid.persistence.graph.impl.SimpleMarkedEdge;
import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardStrategy;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeType;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntries;
import org.apache.usergrid.persistence.graph.serialization.util.EdgeHasher;
@@ -211,9 +212,9 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
@Override
- public void countEdge( final Id rowId, final long shardId, final String... types ) {
+ public void countEdge( final Id rowId, final NodeType nodeType, final long shardId, final String... types ) {
if ( !isDeleted ) {
- edgeShardStrategy.increment( scope, rowId, shardId, 1l, types );
+ edgeShardStrategy.increment( scope, rowId, nodeType, shardId, 1l, types );
}
}
@@ -250,8 +251,8 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
@Override
- public void countEdge( final Id rowId, final long shardId, final String... types ) {
- edgeShardStrategy.increment( scope, rowId, shardId, -1, types );
+ public void countEdge( final Id rowId, final NodeType nodeType, final long shardId, final String... types ) {
+ edgeShardStrategy.increment( scope, rowId, nodeType, shardId, -1, types );
}
@@ -298,7 +299,7 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
final DirectedEdge sourceEdge = new DirectedEdge( targetNodeId, timestamp );
- final ShardEntries sourceRowKeyShard = edgeShardStrategy.getWriteShards( scope, sourceNodeId, timestamp, type );
+ final ShardEntries sourceRowKeyShard = edgeShardStrategy.getWriteShards( scope, sourceNodeId, NodeType.SOURCE, timestamp, type );
for ( Shard shard : sourceRowKeyShard.getEntries() ) {
@@ -306,12 +307,12 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
final long shardId = shard.getShardIndex();
final RowKey sourceRowKey = new RowKey( sourceNodeId, type, shardId );
op.writeEdge( sourceNodeEdgesCf, sourceRowKey, sourceEdge );
- op.countEdge( sourceNodeId, shardId, type );
+ op.countEdge( sourceNodeId, NodeType.SOURCE, shardId, type );
}
final ShardEntries sourceWithTypeRowKeyShard =
- edgeShardStrategy.getWriteShards( scope, sourceNodeId, timestamp, type, targetNodeType );
+ edgeShardStrategy.getWriteShards( scope, sourceNodeId, NodeType.SOURCE, timestamp, type, targetNodeType );
for ( Shard shard : sourceWithTypeRowKeyShard.getEntries() ) {
@@ -319,7 +320,7 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
final RowKeyType sourceRowKeyType = new RowKeyType( sourceNodeId, type, targetNodeId, shardId );
op.writeEdge( sourceNodeTargetTypeCf, sourceRowKeyType, sourceEdge );
- op.countEdge( sourceNodeId, shardId, type, targetNodeType );
+ op.countEdge( sourceNodeId, NodeType.SOURCE, shardId, type, targetNodeType );
}
@@ -330,19 +331,19 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
final DirectedEdge targetEdge = new DirectedEdge( sourceNodeId, timestamp );
- final ShardEntries targetRowKeyShard = edgeShardStrategy.getWriteShards( scope, targetNodeId, timestamp, type );
+ final ShardEntries targetRowKeyShard = edgeShardStrategy.getWriteShards( scope, targetNodeId, NodeType.TARGET, timestamp, type );
for ( Shard shard : targetRowKeyShard.getEntries() ) {
final long shardId = shard.getShardIndex();
final RowKey targetRowKey = new RowKey( targetNodeId, type, shardId );
op.writeEdge( targetNodeEdgesCf, targetRowKey, targetEdge );
- op.countEdge( targetNodeId, shardId, type );
+ op.countEdge( targetNodeId, NodeType.TARGET, shardId, type );
}
final ShardEntries targetWithTypeRowKeyShard =
- edgeShardStrategy.getWriteShards( scope, targetNodeId, timestamp, type, souceNodeType );
+ edgeShardStrategy.getWriteShards( scope, targetNodeId, NodeType.TARGET, timestamp, type, souceNodeType );
for ( Shard shard : targetWithTypeRowKeyShard.getEntries() ) {
@@ -353,7 +354,7 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
op.writeEdge( targetNodeSourceTypeCf, targetRowKeyType, targetEdge );
- op.countEdge( targetNodeId, shardId, type, souceNodeType );
+ op.countEdge( targetNodeId, NodeType.TARGET, shardId, type, souceNodeType );
}
/**
@@ -381,7 +382,7 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
final EdgeSearcher<EdgeRowKey, Long, MarkedEdge> searcher =
new EdgeSearcher<EdgeRowKey, Long, MarkedEdge>( scope, maxTimestamp, search.last(),
- edgeShardStrategy.getReadShards( scope, sourceId, maxTimestamp, type ) ) {
+ edgeShardStrategy.getReadShards( scope, sourceId, NodeType.SOURCE, maxTimestamp, type ) ) {
@Override
protected Serializer<Long> getSerializer() {
@@ -437,7 +438,7 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
final EdgeSearcher<RowKey, DirectedEdge, MarkedEdge> searcher =
new EdgeSearcher<RowKey, DirectedEdge, MarkedEdge>( scope, maxTimestamp, edgeType.last(),
- edgeShardStrategy.getReadShards( scope, sourceId, maxTimestamp, type ) ) {
+ edgeShardStrategy.getReadShards( scope, sourceId,NodeType.SOURCE, maxTimestamp, type ) ) {
@Override
@@ -482,7 +483,7 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
final EdgeSearcher<RowKeyType, DirectedEdge, MarkedEdge> searcher =
new EdgeSearcher<RowKeyType, DirectedEdge, MarkedEdge>( scope, maxTimestamp, edgeType.last(),
- edgeShardStrategy.getReadShards( scope, targetId, maxTimestamp, type, targetType ) ) {
+ edgeShardStrategy.getReadShards( scope, targetId, NodeType.TARGET, maxTimestamp, type, targetType ) ) {
@Override
protected Serializer<DirectedEdge> getSerializer() {
@@ -524,7 +525,7 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
final EdgeSearcher<RowKey, DirectedEdge, MarkedEdge> searcher =
new EdgeSearcher<RowKey, DirectedEdge, MarkedEdge>( scope, maxTimestamp, edgeType.last(),
- edgeShardStrategy.getReadShards( scope, targetId, maxTimestamp, type ) ) {
+ edgeShardStrategy.getReadShards( scope, targetId, NodeType.TARGET, maxTimestamp, type ) ) {
@Override
protected Serializer<DirectedEdge> getSerializer() {
@@ -570,7 +571,7 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
final EdgeSearcher<RowKeyType, DirectedEdge, MarkedEdge> searcher =
new EdgeSearcher<RowKeyType, DirectedEdge, MarkedEdge>( scope, maxTimestamp, edgeType.last(),
- edgeShardStrategy.getReadShards( scope, targetId, maxTimestamp, type, sourceType ) ) {
+ edgeShardStrategy.getReadShards( scope, targetId, NodeType.TARGET, maxTimestamp, type, sourceType ) ) {
@Override
protected Serializer<DirectedEdge> getSerializer() {
return EDGE_SERIALIZER;
@@ -994,7 +995,7 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
/**
* Perform the count on the edge
*/
- void countEdge( final Id rowId, long shardId, String... types );
+ void countEdge( final Id rowId, NodeType type, long shardId, String... types );
/**
* Write the edge into the version cf
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7425ba5d/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardApproximation.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardApproximation.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardApproximation.java
index 311e605..f2f51ef 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardApproximation.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardApproximation.java
@@ -55,7 +55,14 @@ public interface NodeShardApproximation {
/**
- * Flush the current counters in the Approximation
+ * Flush the current counters in the Approximation. Will return immediately after the flush. You can then use flushPending
+ * to check the state.
*/
- public void flush();
+ public void beginFlush();
+
+ /**
+ * Return true if there is data to be flushed
+ * @return
+ */
+ public boolean flushPending();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7425ba5d/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/Counter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/Counter.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/Counter.java
index 4318200..f5666a2 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/Counter.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/Counter.java
@@ -24,6 +24,8 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
+import com.google.common.base.Preconditions;
+
/**
* This class is synchronized for addition. It is meant to be used across multiple threads
@@ -35,7 +37,7 @@ public class Counter {
private final AtomicLong invokeCounter;
/**
- * Pointer to our "current" counter map. We flush this when time expires or we hit our count
+ * Pointer to our "current" counter map. We beginFlush this when time expires or we hit our count
*/
private final ConcurrentHashMap<ShardKey, AtomicLong> counts;
@@ -94,6 +96,10 @@ public class Counter {
* @param other
*/
public void merge(final Counter other){
+
+ Preconditions.checkNotNull(other, "other cannot be null");
+ Preconditions.checkNotNull( other.counts, "other.counts cannot be null" );
+
for(Map.Entry<ShardKey, AtomicLong> entry: other.counts.entrySet()){
add(entry.getKey(), entry.getValue().get());
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7425ba5d/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java
index c7197b5..e0740fa 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java
@@ -20,10 +20,15 @@
package org.apache.usergrid.persistence.graph.serialization.impl.shard.count;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.inject.Inject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.usergrid.persistence.core.consistency.TimeService;
import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
@@ -41,10 +46,12 @@ import rx.schedulers.Schedulers;
/**
* Implementation for doing edge approximation based on counters. Uses a guava loading cache to load values from
- * cassandra, and flush them on cache eviction.
+ * cassandra, and beginFlush them on cache eviction.
*/
public class NodeShardApproximationImpl implements NodeShardApproximation {
+ private static final Logger LOG = LoggerFactory.getLogger(NodeShardApproximationImpl.class);
+
/**
* Read write locks to ensure we atomically swap correctly
*/
@@ -64,7 +71,9 @@ public class NodeShardApproximationImpl implements NodeShardApproximation {
/**
* The counter that is currently in process of flushing to Cassandra. Can be null
*/
- private volatile Counter flushPending;
+ private final BlockingQueue<Counter> flushQueue;
+
+ private final FlushWorker worker;
/**
@@ -78,15 +87,21 @@ public class NodeShardApproximationImpl implements NodeShardApproximation {
this.nodeShardCounterSerialization = nodeShardCounterSerialization;
this.timeService = timeService;
this.currentCounter = new Counter();
+ this.flushQueue = new LinkedBlockingQueue<>( graphFig.getCounterFlushQueueSize() );
+
+ this.worker = new FlushWorker( this.flushQueue, nodeShardCounterSerialization );
+
+ Schedulers.newThread().createWorker().schedule( worker );
+
}
@Override
- public void increment( final ApplicationScope scope, final Id nodeId, final NodeType nodeType, final long shardId, final long count,
- final String... edgeType ) {
+ public void increment( final ApplicationScope scope, final Id nodeId, final NodeType nodeType, final long shardId,
+ final long count, final String... edgeType ) {
- final ShardKey key = new ShardKey( scope, nodeId, shardId, edgeType );
+ final ShardKey key = new ShardKey( scope, nodeId, nodeType, shardId, edgeType );
readLock.lock();
@@ -103,10 +118,10 @@ public class NodeShardApproximationImpl implements NodeShardApproximation {
@Override
- public long getCount( final ApplicationScope scope, final Id nodeId, final NodeType nodeType, final long shardId,
+ public long getCount( final ApplicationScope scope, final Id nodeId, final NodeType nodeType, final long shardId,
final String... edgeType ) {
- final ShardKey key = new ShardKey( scope, nodeId, shardId, edgeType );
+ final ShardKey key = new ShardKey( scope, nodeId, nodeType, shardId, edgeType );
readLock.lock();
@@ -116,9 +131,6 @@ public class NodeShardApproximationImpl implements NodeShardApproximation {
try {
count = currentCounter.get( key );
- if ( flushPending != null ) {
- count += flushPending.get( key );
- }
}
finally {
readLock.unlock();
@@ -131,78 +143,121 @@ public class NodeShardApproximationImpl implements NodeShardApproximation {
@Override
- public void flush() {
+ public void beginFlush() {
writeLockLock.lock();
try {
- flushPending = currentCounter;
- currentCounter = new Counter();
+
+ final boolean queued = flushQueue.offer( currentCounter );
+
+ /**
+ * We were able to q the beginFlush, swap it
+ */
+ if ( queued ) {
+ currentCounter = new Counter();
+ }
}
finally {
writeLockLock.unlock();
}
+ }
- //copy to the batch outside of the command for performance
- final MutationBatch batch = nodeShardCounterSerialization.flush( flushPending );
+ @Override
+ public boolean flushPending() {
+ return flushQueue.size() > 0 || worker.isFlushing();
+ }
- /**
- * Execute the command in hystrix to avoid slamming cassandra
- */
- new HystrixCommand( HystrixCassandra.ASYNC_GROUP ) {
- @Override
- protected Void run() throws Exception {
- /**
- * Execute the batch asynchronously
- */
- batch.execute();
+ /**
+ * Check if we need to beginFlush. If we do, perform the beginFlush
+ */
+ private void checkFlush() {
- return null;
- }
+ //there's no beginFlush pending and we're past the timeout or count
+ if ( currentCounter.getCreateTimestamp() + graphFig.getCounterFlushInterval() > timeService.getCurrentTime()
+ || currentCounter.getInvokeCount() >= graphFig.getCounterFlushCount() ) {
+ beginFlush();
+ }
+ }
- @Override
- protected Object getFallback() {
- //we've failed to mutate. Merge this count back into the current one
- currentCounter.merge( flushPending );
+ /**
+ * Worker that will take from the queue
+ */
+ private static class FlushWorker implements Action0 {
- return null;
- }
- }.execute();
+ private final BlockingQueue<Counter> counterQueue;
+ private final NodeShardCounterSerialization nodeShardCounterSerialization;
- writeLockLock.lock();
+ private volatile Counter rollUp;
- try {
- flushPending = null;
- }
- finally {
- writeLockLock.unlock();
+
+ private FlushWorker( final BlockingQueue<Counter> counterQueue,
+ final NodeShardCounterSerialization nodeShardCounterSerialization ) {
+ this.counterQueue = counterQueue;
+ this.nodeShardCounterSerialization = nodeShardCounterSerialization;
}
- }
- /**
- * Check if we need to flush. If we do, perform the flush
- */
- private void checkFlush() {
+ @Override
+ public void call() {
- //there's no flush pending and we're past the timeout or count
- if ( flushPending == null && (
- currentCounter.getCreateTimestamp() + graphFig.getCounterFlushInterval() > timeService.getCurrentTime()
- || currentCounter.getInvokeCount() >= graphFig.getCounterFlushCount() ) ) {
+ while ( true ) {
+ /**
+ * Block taking the first element. Once we take this, batch drain and roll up the rest
+ */
+
+ try {
+ rollUp = null;
+ rollUp = counterQueue.take();
+ }
+ catch ( InterruptedException e ) {
+ LOG.error( "Unable to read from counter queue", e );
+ throw new RuntimeException( "Unable to read from counter queue", e );
- /**
- * Fire the flush action asynchronously
- */
- Schedulers.immediate().createWorker().schedule( new Action0() {
- @Override
- public void call() {
- flush();
}
- } );
+
+
+
+
+ //copy to the batch outside of the command for performance
+ final MutationBatch batch = nodeShardCounterSerialization.flush( rollUp );
+
+ /**
+ * Execute the command in hystrix to avoid slamming cassandra
+ */
+ new HystrixCommand( HystrixCassandra.ASYNC_GROUP ) {
+
+ @Override
+ protected Void run() throws Exception {
+ batch.execute();
+
+ return null;
+ }
+
+
+ @Override
+ protected Object getFallback() {
+ //we've failed to mutate. Merge this count back into the current one
+ counterQueue.offer( rollUp );
+
+ return null;
+ }
+ }.execute();
+ }
+
+ }
+
+
+ /**
+ * Return true if we're in the process of flushing
+ * @return
+ */
+ public boolean isFlushing(){
+ return rollUp != null;
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7425ba5d/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/ShardKey.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/ShardKey.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/ShardKey.java
index 63c87d3..55a761b 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/ShardKey.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/ShardKey.java
@@ -22,6 +22,7 @@ package org.apache.usergrid.persistence.graph.serialization.impl.shard.count;
import java.util.Arrays;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeType;
import org.apache.usergrid.persistence.model.entity.Id;
@@ -32,14 +33,43 @@ public class ShardKey {
private final ApplicationScope scope;
private final Id nodeId;
private final long shardId;
+ private final NodeType nodeType;
private final String[] edgeTypes;
- public ShardKey( final ApplicationScope scope, final Id nodeId, final long shardId, final String... edgeTypes ) {
+ public ShardKey( final ApplicationScope scope, final Id nodeId, final NodeType nodeType, final long shardId, final String... edgeTypes ) {
this.scope = scope;
this.nodeId = nodeId;
this.shardId = shardId;
this.edgeTypes = edgeTypes;
+ this.nodeType = nodeType;
+ }
+
+
+
+
+ public ApplicationScope getScope() {
+ return scope;
+ }
+
+
+ public Id getNodeId() {
+ return nodeId;
+ }
+
+
+ public long getShardId() {
+ return shardId;
+ }
+
+
+ public String[] getEdgeTypes() {
+ return edgeTypes;
+ }
+
+
+ public NodeType getNodeType() {
+ return nodeType;
}
@@ -63,6 +93,9 @@ public class ShardKey {
if ( !nodeId.equals( shardKey.nodeId ) ) {
return false;
}
+ if ( nodeType != shardKey.nodeType ) {
+ return false;
+ }
if ( !scope.equals( shardKey.scope ) ) {
return false;
}
@@ -71,31 +104,12 @@ public class ShardKey {
}
- public ApplicationScope getScope() {
- return scope;
- }
-
-
- public Id getNodeId() {
- return nodeId;
- }
-
-
- public long getShardId() {
- return shardId;
- }
-
-
- public String[] getEdgeTypes() {
- return edgeTypes;
- }
-
-
@Override
public int hashCode() {
int result = scope.hashCode();
result = 31 * result + nodeId.hashCode();
result = 31 * result + ( int ) ( shardId ^ ( shardId >>> 32 ) );
+ result = 31 * result + nodeType.hashCode();
result = 31 * result + Arrays.hashCode( edgeTypes );
return result;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7425ba5d/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
index 1e6614a..5b7c901 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
@@ -104,7 +104,7 @@ public class EdgeShardSerializationImpl implements EdgeShardSerialization {
final MutationBatch batch = keyspace.prepareMutationBatch();
- batch.withRow( EDGE_SHARDS, rowKey ).putColumn( shard, HOLDER ).setTimestamp( timestamp );
+ batch.withTimestamp( timestamp ).withRow( EDGE_SHARDS, rowKey ).putColumn( shard, HOLDER );
return batch;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7425ba5d/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/util/EdgeHasher.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/util/EdgeHasher.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/util/EdgeHasher.java
new file mode 100644
index 0000000..14a67fa
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/util/EdgeHasher.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.util;
+
+
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+
+import org.apache.cassandra.utils.MurmurHash;
+
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+/**
+ *
+ *
+ */
+public class EdgeHasher {
+
+ private static final String UTF_8 = "UTF-8";
+ private static final Charset CHARSET = Charset.forName( UTF_8 );
+
+
+ /**
+ * Create a hash based on the edge type and the type of the id that will be inserted into the column
+ *
+ *
+ * @param edgeType The name of the edge type
+ * @param idForColumn The id of the value that will be in the column
+ *
+ * @return A hash that represents a consistent one way hash of the fields
+ */
+ public static long[] createEdgeHash( final String edgeType, final Id idForColumn ) {
+
+ return createEdgeHash( edgeType, idForColumn.getType() );
+ }
+
+
+ /**
+ * Create the edge hash from the edge type and id type
+ * @param edgeTypes
+ * @return
+ */
+ public static long[] createEdgeHash(final String... edgeTypes){
+ final StringBuilder hashString = new StringBuilder();
+
+ for(String edge: edgeTypes){
+ hashString.append(edge);
+ }
+
+ return createEdgeHash( hashString.toString() );
+ }
+
+
+ /**
+ * Create a ash based on the edge type and the type of the id that will be inserted into the column
+ *
+ * @return A hash that represents a consistent one way hash of the fields
+ */
+ public static long[] createEdgeHash( final String edgeType ) {
+
+
+ ByteBuffer key = ByteBuffer.wrap( edgeType.getBytes( CHARSET ) );
+
+ return MurmurHash.hash3_x64_128( key, key.position(), key.remaining(), 0 );
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7425ba5d/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardingIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardingIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardingIT.java
index 01e07b1..adcb42e 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardingIT.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardingIT.java
@@ -35,6 +35,7 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
import org.apache.usergrid.persistence.graph.guice.TestGraphModule;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardApproximation;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeType;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
@@ -101,7 +102,7 @@ public class GraphManagerShardingIT {
//each edge causes 4 counts
final long writeCount = flushCount/4;
- assertTrue( "Shard size must be >= flush Count", maxShardSize >= flushCount );
+ assertTrue( "Shard size must be >= beginFlush Count", maxShardSize >= flushCount );
Id targetId = null;
@@ -115,14 +116,14 @@ public class GraphManagerShardingIT {
}
- long shardCount = nodeShardApproximation.getCount( scope, sourceId, 0l, edgeType );
+ long shardCount = nodeShardApproximation.getCount( scope, sourceId, NodeType.SOURCE, 0l, edgeType );
assertEquals("Shard count for source node should be the same as write count", writeCount, shardCount);
//now verify it's correct for the target
- shardCount = nodeShardApproximation.getCount( scope, targetId, 0l, edgeType );
+ shardCount = nodeShardApproximation.getCount( scope, targetId, NodeType.TARGET, 0l, edgeType );
assertEquals(1, shardCount);
@@ -151,7 +152,7 @@ public class GraphManagerShardingIT {
//each edge causes 4 counts
final long writeCount = flushCount/4;
- assertTrue( "Shard size must be >= flush Count", maxShardSize >= flushCount );
+ assertTrue( "Shard size must be >= beginFlush Count", maxShardSize >= flushCount );
Id sourceId = null;
@@ -165,14 +166,14 @@ public class GraphManagerShardingIT {
}
- long shardCount = nodeShardApproximation.getCount( scope, targetId, 0l, edgeType );
+ long shardCount = nodeShardApproximation.getCount( scope, targetId, NodeType.TARGET, 0l, edgeType );
assertEquals("Shard count for source node should be the same as write count", writeCount, shardCount);
//now verify it's correct for the target
- shardCount = nodeShardApproximation.getCount( scope, sourceId, 0l, edgeType );
+ shardCount = nodeShardApproximation.getCount( scope, sourceId, NodeType.SOURCE, 0l, edgeType );
assertEquals(1, shardCount);
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7425ba5d/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerializationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerializationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerializationTest.java
index 6135a2d..937a51a 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerializationTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerializationTest.java
@@ -96,16 +96,16 @@ public class EdgeShardSerializationTest {
String[] types = { "edgeType", "subType" };
- MutationBatch batch = edgeShardSerialization.writeEdgeMeta( scope, now, slice1, timestamp, types );
+ MutationBatch batch = edgeShardSerialization.writeEdgeMeta( scope, now, NodeType.SOURCE, slice1, timestamp, types );
- batch.mergeShallow( edgeShardSerialization.writeEdgeMeta( scope, now, slice2, timestamp, types ) );
+ batch.mergeShallow( edgeShardSerialization.writeEdgeMeta( scope, now, NodeType.SOURCE, slice2, timestamp, types ) );
- batch.mergeShallow( edgeShardSerialization.writeEdgeMeta( scope, now, slice3, timestamp, types ) );
+ batch.mergeShallow( edgeShardSerialization.writeEdgeMeta( scope, now, NodeType.SOURCE, slice3, timestamp, types ) );
batch.execute();
- Iterator<Shard> results = edgeShardSerialization.getEdgeMetaData( scope, now, Optional.<Shard>absent(), types );
+ Iterator<Shard> results = edgeShardSerialization.getEdgeMetaData( scope, now, NodeType.SOURCE, Optional.<Shard>absent(), types );
Shard next = results.next();
@@ -129,8 +129,14 @@ public class EdgeShardSerializationTest {
assertFalse( results.hasNext() );
+ //test we get nothing with the other node type
+ results = edgeShardSerialization.getEdgeMetaData( scope, now, NodeType.TARGET, Optional.<Shard>absent(), types );
+
+ assertFalse(results.hasNext());
+
+
//test paging and size
- results = edgeShardSerialization.getEdgeMetaData( scope, now, Optional.of( new Shard( slice2, 0l ) ), types );
+ results = edgeShardSerialization.getEdgeMetaData( scope, now, NodeType.SOURCE, Optional.of( new Shard( slice2, 0l ) ), types );
next = results.next();
@@ -162,16 +168,16 @@ public class EdgeShardSerializationTest {
String[] types = { "edgeType", "subType" };
- MutationBatch batch = edgeShardSerialization.writeEdgeMeta( scope, now, slice1, timestamp, types );
+ MutationBatch batch = edgeShardSerialization.writeEdgeMeta( scope, now, NodeType.SOURCE, slice1, timestamp, types );
- batch.mergeShallow( edgeShardSerialization.writeEdgeMeta( scope, now, slice2, timestamp,types ) );
+ batch.mergeShallow( edgeShardSerialization.writeEdgeMeta( scope, now, NodeType.SOURCE, slice2, timestamp,types ) );
- batch.mergeShallow( edgeShardSerialization.writeEdgeMeta( scope, now, slice3, timestamp, types ) );
+ batch.mergeShallow( edgeShardSerialization.writeEdgeMeta( scope, now, NodeType.SOURCE, slice3, timestamp, types ) );
batch.execute();
- Iterator<Shard> results = edgeShardSerialization.getEdgeMetaData( scope, now, Optional.<Shard>absent(), types );
+ Iterator<Shard> results = edgeShardSerialization.getEdgeMetaData( scope, now, NodeType.SOURCE, Optional.<Shard>absent(), types );
assertEquals( slice3, results.next().getShardIndex() );
@@ -181,10 +187,15 @@ public class EdgeShardSerializationTest {
assertFalse( results.hasNext() );
+ //test nothing with other type
+ results = edgeShardSerialization.getEdgeMetaData( scope, now, NodeType.TARGET, Optional.<Shard>absent(), types );
+
+ assertFalse(results.hasNext());
+
//test paging and size
- edgeShardSerialization.removeEdgeMeta( scope, now, slice1, types ).execute();
+ edgeShardSerialization.removeEdgeMeta( scope, now, NodeType.SOURCE, slice1, types ).execute();
- results = edgeShardSerialization.getEdgeMetaData( scope, now,Optional.<Shard>absent(), types );
+ results = edgeShardSerialization.getEdgeMetaData( scope, now, NodeType.SOURCE, Optional.<Shard>absent(), types );
assertEquals( slice3, results.next().getShardIndex() );
@@ -193,11 +204,11 @@ public class EdgeShardSerializationTest {
assertFalse( results.hasNext() );
- edgeShardSerialization.removeEdgeMeta( scope, now, slice2, types ).execute();
+ edgeShardSerialization.removeEdgeMeta( scope, now, NodeType.SOURCE, slice2, types ).execute();
- edgeShardSerialization.removeEdgeMeta( scope, now, slice3, types ).execute();
+ edgeShardSerialization.removeEdgeMeta( scope, now, NodeType.SOURCE, slice3, types ).execute();
- results = edgeShardSerialization.getEdgeMetaData( scope, now, Optional.<Shard>absent(), types );
+ results = edgeShardSerialization.getEdgeMetaData( scope, now, NodeType.SOURCE, Optional.<Shard>absent(), types );
assertFalse( results.hasNext() );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7425ba5d/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
index 7546531..383e32c 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
@@ -44,6 +44,7 @@ import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.crea
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -136,15 +137,18 @@ public class NodeShardAllocationTest {
* Mock up returning an empty iterator, our audit shouldn't create a new shard
*/
when( edgeShardSerialization
- .getEdgeMetaData( same( scope ), same( nodeId ), any( Optional.class ), same( type ),
+ .getEdgeMetaData( same( scope ), same( nodeId ), eq( NodeType.SOURCE ), any( Optional.class ), same( type ),
same( subType ) ) ).thenReturn( Collections.<Shard>emptyList().iterator() );
- final boolean result = approximation.auditMaxShard( scope, nodeId, type, subType );
+ final boolean result = approximation.auditMaxShard( scope, nodeId, NodeType.SOURCE, type, subType );
assertFalse( "No shard allocated", result );
}
+
+
+
@Test
public void existingFutureShardSameTime() {
final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
@@ -182,10 +186,10 @@ public class NodeShardAllocationTest {
* Mock up returning a min shard, and a future shard
*/
when( edgeShardSerialization
- .getEdgeMetaData( same( scope ), same( nodeId ), any( Optional.class ), same( type ),
+ .getEdgeMetaData( same( scope ), same( nodeId ), eq( NodeType.TARGET), any( Optional.class ), same( type ),
same( subType ) ) ).thenReturn( Arrays.asList( futureShard ).iterator() );
- final boolean result = approximation.auditMaxShard( scope, nodeId, type, subType );
+ final boolean result = approximation.auditMaxShard( scope, nodeId, NodeType.TARGET, type, subType );
assertFalse( "No shard allocated", result );
}
@@ -226,7 +230,7 @@ public class NodeShardAllocationTest {
* Mock up returning a min shard, and a future shard
*/
when( edgeShardSerialization
- .getEdgeMetaData( same( scope ), same( nodeId ), any( Optional.class ), same( type ),
+ .getEdgeMetaData( same( scope ), same( nodeId ), eq(NodeType.TARGET), any( Optional.class ), same( type ),
same( subType ) ) ).thenReturn( Arrays.asList( new Shard(0l, 0l) ).iterator() );
@@ -234,9 +238,9 @@ public class NodeShardAllocationTest {
final long count = graphFig.getShardSize() - 1;
- when( nodeShardApproximation.getCount(scope, nodeId, 0l, type, subType )).thenReturn( count );
+ when( nodeShardApproximation.getCount(scope, nodeId, NodeType.TARGET, 0l, type, subType )).thenReturn( count );
- final boolean result = approximation.auditMaxShard( scope, nodeId, type, subType );
+ final boolean result = approximation.auditMaxShard( scope, nodeId, NodeType.TARGET, type, subType );
assertFalse( "Shard allocated", result );
}
@@ -277,7 +281,7 @@ public class NodeShardAllocationTest {
* Mock up returning a min shard
*/
when( edgeShardSerialization
- .getEdgeMetaData( same( scope ), same( nodeId ), any( Optional.class ), same( type ),
+ .getEdgeMetaData( same( scope ), same( nodeId ), eq(NodeType.SOURCE), any( Optional.class ), same( type ),
same( subType ) ) ).thenReturn( Arrays.asList( new Shard(0l, 0l) ).iterator() );
@@ -285,7 +289,7 @@ public class NodeShardAllocationTest {
//return a shard size equal to our max
when( nodeShardApproximation
- .getCount( scope , nodeId, 0l,type , subType ))
+ .getCount( scope , nodeId, NodeType.SOURCE, 0l,type , subType ))
.thenReturn( shardCount );
ArgumentCaptor<Long> shardValue = ArgumentCaptor.forClass( Long.class );
@@ -294,11 +298,11 @@ public class NodeShardAllocationTest {
//mock up our mutation
when( edgeShardSerialization
- .writeEdgeMeta( same( scope ), same( nodeId ), shardValue.capture(), timestampValue.capture(), same( type ), same( subType ) ) )
+ .writeEdgeMeta( same( scope ), same( nodeId ), eq(NodeType.SOURCE), shardValue.capture(), timestampValue.capture(), same( type ), same( subType ) ) )
.thenReturn( mock( MutationBatch.class ) );
- final boolean result = approximation.auditMaxShard( scope, nodeId, type, subType );
+ final boolean result = approximation.auditMaxShard( scope, nodeId, NodeType.SOURCE, type, subType );
assertTrue( "Shard allocated", result );
@@ -379,7 +383,7 @@ public class NodeShardAllocationTest {
* Mock up returning a min shard
*/
when( edgeShardSerialization
- .getEdgeMetaData( same( scope ), same( nodeId ), any( Optional.class ), same( type ),
+ .getEdgeMetaData( same( scope ), same( nodeId ), eq(NodeType.TARGET), any( Optional.class ), same( type ),
same( subType ) ) ).thenReturn( Arrays.asList(futureShard3, futureShard2, futureShard1, minShard).iterator() );
@@ -391,12 +395,12 @@ public class NodeShardAllocationTest {
//mock up our mutation
when( edgeShardSerialization
- .removeEdgeMeta( same( scope ), same( nodeId ), newLongValue.capture(), same( type ), same( subType ) ) )
+ .removeEdgeMeta( same( scope ), same( nodeId ), eq(NodeType.TARGET), newLongValue.capture(), same( type ), same( subType ) ) )
.thenReturn( mock( MutationBatch.class ) );
final Iterator<Shard>
- result = approximation.getSourceShards( scope, nodeId, Optional.<Shard>absent(), type, subType );
+ result = approximation.getShards( scope, nodeId, NodeType.TARGET, Optional.<Shard>absent(), type, subType );
assertTrue( "Shards present", result.hasNext() );
@@ -453,10 +457,10 @@ public class NodeShardAllocationTest {
* Mock up returning an empty iterator, our audit shouldn't create a new shard
*/
when( edgeShardSerialization
- .getEdgeMetaData( same( scope ), same( nodeId ), any( Optional.class ), same( type ),
+ .getEdgeMetaData( same( scope ), same( nodeId ), eq(NodeType.TARGET), any( Optional.class ), same( type ),
same( subType ) ) ).thenReturn( Collections.<Shard>emptyList().iterator() );
- final Iterator<Shard> result = approximation.getSourceShards( scope, nodeId, Optional.<Shard>absent(), type,
+ final Iterator<Shard> result = approximation.getShards( scope, nodeId, NodeType.TARGET, Optional.<Shard>absent(), type,
subType );
assertEquals("0 shard allocated", 0l, result.next().getShardIndex());
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7425ba5d/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java
index da19ce5..51448b7 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java
@@ -39,12 +39,15 @@ import org.junit.Test;
import org.safehaus.guicyfig.Bypass;
import org.safehaus.guicyfig.OptionState;
import org.safehaus.guicyfig.Overrides;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
import org.apache.usergrid.persistence.core.consistency.TimeService;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.GraphFig;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardApproximation;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeType;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
@@ -68,6 +71,7 @@ import static org.mockito.Mockito.when;
public class NodeShardApproximationTest {
+ private static final Logger LOG = LoggerFactory.getLogger( NodeShardApproximation.class );
private GraphFig graphFig;
@@ -92,25 +96,24 @@ public class NodeShardApproximationTest {
when( graphFig.getShardCacheSize() ).thenReturn( 10000l );
when( graphFig.getShardSize() ).thenReturn( 250000l );
+ when( graphFig.getCounterFlushQueueSize() ).thenReturn( 10000 );
nodeShardCounterSerialization = mock( NodeShardCounterSerialization.class );
- when(nodeShardCounterSerialization.flush( any(Counter.class) )).thenReturn( mock( MutationBatch.class) );
-
+ when( nodeShardCounterSerialization.flush( any( Counter.class ) ) ).thenReturn( mock( MutationBatch.class ) );
timeService = mock( TimeService.class );
- when(timeService.getCurrentTime()).thenReturn( System.currentTimeMillis() );
+ when( timeService.getCurrentTime() ).thenReturn( System.currentTimeMillis() );
}
@Test
- public void testSingleShard() {
-
+ public void testSingleShard() throws InterruptedException {
- when(graphFig.getCounterFlushCount()).thenReturn( 100000l );
+ when( graphFig.getCounterFlushCount() ).thenReturn( 100000l );
NodeShardApproximation approximation =
new NodeShardApproximationImpl( graphFig, nodeShardCounterSerialization, timeService );
@@ -120,7 +123,9 @@ public class NodeShardApproximationTest {
final String type = "type";
final String type2 = "subType";
- long count = approximation.getCount( scope, id, shardId, type, type2 );
+ long count = approximation.getCount( scope, id, NodeType.TARGET, shardId, type, type2 );
+
+ waitForFlush( approximation );
assertEquals( 0, count );
}
@@ -130,8 +135,6 @@ public class NodeShardApproximationTest {
public void testSingleShardMultipleThreads() throws ExecutionException, InterruptedException {
-
-
NodeShardCounterSerialization serialization = new TestNodeShardCounterSerialization();
final NodeShardApproximation approximation =
@@ -158,7 +161,7 @@ public class NodeShardApproximationTest {
public Long call() throws Exception {
for ( int i = 0; i < increments; i++ ) {
- approximation.increment( scope, id, shardId, 1, type, type2 );
+ approximation.increment( scope, id, NodeType.TARGET, shardId, 1, type, type2 );
}
return 0l;
@@ -169,24 +172,25 @@ public class NodeShardApproximationTest {
}
-
for ( Future<Long> future : futures ) {
- future.get();
+ future.get();
}
-
+ waitForFlush( approximation );
//get our count. It should be accurate b/c we only have 1 instance
- final long returnedCount = approximation.getCount( scope, id, shardId, type, type2);
+ final long returnedCount = approximation.getCount( scope, id, NodeType.TARGET, shardId, type, type2 );
final long expected = workers * increments;
- assertEquals(expected, returnedCount);
-
+ assertEquals( expected, returnedCount );
+ //test we get nothing with the other type
+ final long emptyCount = approximation.getCount( scope, id, NodeType.SOURCE, shardId, type, type2 );
+ assertEquals( 0, emptyCount );
}
@@ -195,8 +199,6 @@ public class NodeShardApproximationTest {
public void testMultipleShardMultipleThreads() throws ExecutionException, InterruptedException {
-
-
NodeShardCounterSerialization serialization = new TestNodeShardCounterSerialization();
final NodeShardApproximation approximation =
@@ -210,8 +212,7 @@ public class NodeShardApproximationTest {
final String type = "type";
final String type2 = "subType";
- final AtomicLong shardIdCounter = new AtomicLong( );
-
+ final AtomicLong shardIdCounter = new AtomicLong();
ExecutorService executor = Executors.newFixedThreadPool( workers );
@@ -227,7 +228,7 @@ public class NodeShardApproximationTest {
final long threadShardId = shardIdCounter.incrementAndGet();
for ( int i = 0; i < increments; i++ ) {
- approximation.increment( scope, id, threadShardId, 1, type, type2 );
+ approximation.increment( scope, id, NodeType.SOURCE, threadShardId, 1, type, type2 );
}
return threadShardId;
@@ -238,29 +239,41 @@ public class NodeShardApproximationTest {
}
-
for ( Future<Long> future : futures ) {
- final long shardId = future.get();
+ final long shardId = future.get();
+
+ waitForFlush( approximation );
- final long returnedCount = approximation.getCount( scope, id, shardId, type, type2);
+ final long returnedCount = approximation.getCount( scope, id, NodeType.SOURCE, shardId, type, type2 );
- assertEquals(increments, returnedCount);
+ assertEquals( increments, returnedCount );
}
+ }
+ private void waitForFlush( NodeShardApproximation approximation ) throws InterruptedException {
+ approximation.beginFlush();
+ while ( approximation.flushPending() ) {
+
+ LOG.info("Waiting on beginFlush to complete");
+
+ Thread.sleep( 100 );
+ }
}
+
/**
* These are created b/c we can't use Mockito. It OOM's with keeping track of all the mock invocations
*/
- private static class TestNodeShardCounterSerialization implements NodeShardCounterSerialization{
+ private static class TestNodeShardCounterSerialization implements NodeShardCounterSerialization {
private Counter copy = new Counter();
+
@Override
public MutationBatch flush( final Counter counter ) {
copy.merge( counter );
@@ -281,7 +294,6 @@ public class NodeShardApproximationTest {
}
-
/**
* Simple test mutation to no-op during tests
*/
@@ -415,14 +427,14 @@ public class NodeShardApproximationTest {
}
-
- private static class TestGraphFig implements GraphFig{
+ private static class TestGraphFig implements GraphFig {
@Override
public int getScanPageSize() {
return 0; //To change body of implemented methods use File | Settings | File Templates.
}
+
@Override
public int getRepairConcurrentSize() {
return 0; //To change body of implemented methods use File | Settings | File Templates.
@@ -460,6 +472,12 @@ public class NodeShardApproximationTest {
@Override
+ public int getCounterFlushQueueSize() {
+ return 10000;
+ }
+
+
+ @Override
public void addPropertyChangeListener( final PropertyChangeListener propertyChangeListener ) {
//To change body of implemented methods use File | Settings | File Templates.
}
@@ -555,7 +573,8 @@ public class NodeShardApproximationTest {
}
}
- private static class TestTimeService implements TimeService{
+
+ private static class TestTimeService implements TimeService {
@Override
public long getCurrentTime() {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7425ba5d/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationTest.java
index 9968f67..aad918e 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationTest.java
@@ -35,6 +35,7 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.GraphFig;
import org.apache.usergrid.persistence.graph.guice.TestGraphModule;
import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeType;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
@@ -96,11 +97,11 @@ public class NodeShardCounterSerializationTest {
final Id id = createId( "test" );
- ShardKey key1 = new ShardKey( scope, id, 0, "type1" );
+ ShardKey key1 = new ShardKey( scope, id, NodeType.SOURCE, 0, "type1" );
- ShardKey key2 = new ShardKey( scope, id, 0, "type2" );
+ ShardKey key2 = new ShardKey( scope, id, NodeType.SOURCE, 0, "type2" );
- ShardKey key3 = new ShardKey( scope, id, 1, "type1" );
+ ShardKey key3 = new ShardKey( scope, id, NodeType.SOURCE, 1, "type1" );
Counter counter = new Counter();