You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by md...@apache.org on 2015/11/06 00:43:45 UTC
[3/5] usergrid git commit: Removes approximation logic on shards to
ensure that we're auditing the actual shard value.
Removes approximation logic on shards to ensure that we're auditing the actual shard value.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/591a2f1f
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/591a2f1f
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/591a2f1f
Branch: refs/heads/2.1-release
Commit: 591a2f1fd97b7259cf72d10855f2832b8342b5a4
Parents: 9c54de3
Author: Todd Nine <tn...@apigee.com>
Authored: Thu Nov 5 14:37:39 2015 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu Nov 5 14:40:08 2015 -0700
----------------------------------------------------------------------
.../persistence/graph/guice/GraphModule.java | 7 -
.../impl/shard/EdgeShardStrategy.java | 10 +-
.../impl/shard/NodeShardApproximation.java | 66 --
.../serialization/impl/shard/count/Counter.java | 131 ----
.../shard/count/NodeShardApproximationImpl.java | 272 --------
.../count/NodeShardCounterSerialization.java | 48 --
.../NodeShardCounterSerializationImpl.java | 186 ------
.../impl/shard/count/ShardKey.java | 75 ---
.../shard/impl/NodeShardAllocationImpl.java | 19 +-
.../impl/shard/impl/ShardGroupDeletionImpl.java | 7 +
.../impl/ShardedEdgeSerializationImpl.java | 28 -
.../shard/impl/SizebasedEdgeShardStrategy.java | 13 +-
.../graph/GraphManagerShardingIT.java | 208 ------
.../impl/shard/NodeShardAllocationTest.java | 48 +-
.../shard/count/NodeShardApproximationTest.java | 627 -------------------
.../NodeShardCounterSerializationTest.java | 124 ----
.../shard/impl/ShardGroupDeletionImplTest.java | 3 +
17 files changed, 25 insertions(+), 1847 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/591a2f1f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
index d2476eb..cf0ffcb 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
@@ -59,14 +59,10 @@ import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeColumn
import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardSerialization;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardStrategy;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardAllocation;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardApproximation;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardCache;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardGroupCompaction;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardGroupDeletion;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardedEdgeSerialization;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.count.NodeShardApproximationImpl;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.count.NodeShardCounterSerialization;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.count.NodeShardCounterSerializationImpl;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.AsyncTaskExecutorImpl;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.EdgeShardSerializationImpl;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.NodeShardAllocationImpl;
@@ -114,9 +110,7 @@ public abstract class GraphModule extends AbstractModule {
*/
bind(NodeShardAllocation.class).to( NodeShardAllocationImpl.class );
- bind( NodeShardApproximation.class ).to( NodeShardApproximationImpl.class );
bind( NodeShardCache.class ).to( NodeShardCacheImpl.class );
- bind( NodeShardCounterSerialization.class ).to( NodeShardCounterSerializationImpl.class );
/**
* Binding for task tracker
@@ -182,7 +176,6 @@ public abstract class GraphModule extends AbstractModule {
migrationBinding.addBinding().to( Key.get( EdgeColumnFamilies.class ) );
migrationBinding.addBinding().to( Key.get( EdgeShardSerialization.class ) );
- migrationBinding.addBinding().to( Key.get( NodeShardCounterSerialization.class ) );
//Get the old version and the new one
migrationBinding.addBinding().to( Key.get( EdgeMetadataSerializationV1Impl.class) );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/591a2f1f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardStrategy.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardStrategy.java
index 1e02a72..803e31e 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardStrategy.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardStrategy.java
@@ -44,15 +44,7 @@ public interface EdgeShardStrategy {
*/
public Iterator<ShardEntryGroup> getReadShards(final ApplicationScope scope, final long maxTimestamp, final DirectedEdgeMeta directedEdgeMeta );
- /**
- * Increment our count meta data by the passed value. Can be a positive or a negative number.
- * @param scope The scope in the application
- * @param shard The shard to use
- * @param count The amount to increment or decrement
- * @param directedEdgeMeta The edge meta data to use
- * @return
- */
- public void increment(final ApplicationScope scope, Shard shard, long count, final DirectedEdgeMeta directedEdgeMeta );
+
http://git-wip-us.apache.org/repos/asf/usergrid/blob/591a2f1f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardApproximation.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardApproximation.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardApproximation.java
deleted file mode 100644
index fc39e56..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardApproximation.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.graph.serialization.impl.shard;
-
-
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-
-
-/**
- * Interface for creating approximate estimates of shards
- */
-public interface NodeShardApproximation {
-
-
- /**
- * Increment the shard Id the specified amount
- *
- * @param scope The scope
- * @param shard The shard to use
- * @param count The count to increment
- * @param directedEdgeMeta The directed edge meta data to use
- */
- public void increment( final ApplicationScope scope, final Shard shard,
- final long count, final DirectedEdgeMeta directedEdgeMeta );
-
-
- /**
- * Get the approximation of the number of unique items
- *
- * @param scope The scope
- * @param directedEdgeMeta The directed edge meta data to use
- */
- public long getCount( final ApplicationScope scope, final Shard shard, final DirectedEdgeMeta directedEdgeMeta );
-
-
- /**
- * Flush the current counters in the Approximation. Will return immediately after the flush. You can then use flushPending
- * to check the state.
- */
- public void beginFlush();
-
- /**
- * Return true if there is data to be flushed
- * @return
- */
- public boolean flushPending();
-
-
-}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/591a2f1f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/Counter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/Counter.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/Counter.java
deleted file mode 100644
index f5666a2..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/Counter.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.persistence.graph.serialization.impl.shard.count;
-
-
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
-
-import com.google.common.base.Preconditions;
-
-
-/**
- * This class is synchronized for addition. It is meant to be used across multiple threads
- */
-public class Counter {
- /**
- * The counter to tell us how often it was invoked
- */
- private final AtomicLong invokeCounter;
-
- /**
- * Pointer to our "current" counter map. We beginFlush this when time expires or we hit our count
- */
- private final ConcurrentHashMap<ShardKey, AtomicLong> counts;
-
- /**
- * The timestamp the concurrent map was created
- */
- private final long createTimestamp;
-
-
- /**
- * Implementation of the internal counters
- */
- public Counter() {
- this.createTimestamp = System.currentTimeMillis();
- this.invokeCounter = new AtomicLong();
- this.counts = new ConcurrentHashMap<>();
- }
-
-
- /**
- * Add the count to the key.
- */
- public void add( final ShardKey key, final long count ) {
- AtomicLong counter = counts.get( key );
-
- if ( counter == null ) {
- counter = new AtomicLong();
- AtomicLong existingCounter = counts.putIfAbsent( key, counter );
-
- if ( existingCounter != null ) {
- counter = existingCounter;
- }
- }
-
- counter.addAndGet( count );
- invokeCounter.incrementAndGet();
- }
-
-
- /**
- * Get the current valye from the cache
- */
- public long get( final ShardKey key ) {
- AtomicLong counter = counts.get( key );
-
- if ( counter == null ) {
- return 0;
- }
-
- return counter.get();
- }
-
-
- /**
- * Deep copy the counts from other into this counter
- * @param other
- */
- public void merge(final Counter other){
-
- Preconditions.checkNotNull(other, "other cannot be null");
- Preconditions.checkNotNull( other.counts, "other.counts cannot be null" );
-
- for(Map.Entry<ShardKey, AtomicLong> entry: other.counts.entrySet()){
- add(entry.getKey(), entry.getValue().get());
- }
- }
-
-
- /**
- * Get all entries
- * @return
- */
- public Set<Map.Entry<ShardKey, AtomicLong>> getEntries(){
- return counts.entrySet();
- }
-
-
- /**
- * Get the count of the number of times we've been incremented
- * @return
- */
- public long getInvokeCount() {
- return invokeCounter.get();
- }
-
-
-
- public long getCreateTimestamp() {
- return createTimestamp;
- }
-}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/591a2f1f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java
deleted file mode 100644
index fceb32c..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java
+++ /dev/null
@@ -1,272 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.graph.serialization.impl.shard.count;
-
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import javax.inject.Inject;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.core.consistency.TimeService;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.graph.GraphFig;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardApproximation;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
-
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.hystrix.HystrixCommand;
-import com.netflix.hystrix.HystrixCommandGroupKey;
-import com.netflix.hystrix.HystrixThreadPoolProperties;
-
-import rx.functions.Action0;
-import rx.schedulers.Schedulers;
-
-
-/**
- * Implementation for doing edge approximation based on counters. Uses a guava loading cache to load values from
- * cassandra, and beginFlush them on cache eviction.
- */
-public class NodeShardApproximationImpl implements NodeShardApproximation {
-
- private static final Logger LOG = LoggerFactory.getLogger(NodeShardApproximationImpl.class);
-
- /**
- * Read write locks to ensure we atomically swap correctly
- */
- private final ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
- private final ReentrantReadWriteLock.ReadLock readLock = reentrantReadWriteLock.readLock();
- private final ReentrantReadWriteLock.WriteLock writeLockLock = reentrantReadWriteLock.writeLock();
-
- private final GraphFig graphFig;
- private final NodeShardCounterSerialization nodeShardCounterSerialization;
- private final TimeService timeService;
-
- /**
- * Counter currently implemented
- */
- private volatile Counter currentCounter;
-
- /**
- * The counter that is currently in process of flushing to Cassandra. Can be null
- */
- private final BlockingQueue<Counter> flushQueue;
-
- private final FlushWorker worker;
-
- /**
- * Command group used for realtime user commands
- */
- public static final HystrixCommand.Setter
- COUNT_GROUP = HystrixCommand.Setter.withGroupKey(
- HystrixCommandGroupKey.Factory.asKey( "BatchCounterRollup" ) ).andThreadPoolPropertiesDefaults(
- HystrixThreadPoolProperties.Setter().withCoreSize( 100 ) );
-
-
- /**
- * Create a time shard approximation with the correct configuration.
- */
- @Inject
- public NodeShardApproximationImpl( final GraphFig graphFig,
- final NodeShardCounterSerialization nodeShardCounterSerialization,
- final TimeService timeService ) {
- this.graphFig = graphFig;
- this.nodeShardCounterSerialization = nodeShardCounterSerialization;
- this.timeService = timeService;
- this.currentCounter = new Counter();
- this.flushQueue = new LinkedBlockingQueue<>( graphFig.getCounterFlushQueueSize() );
-
- this.worker = new FlushWorker( this.flushQueue, nodeShardCounterSerialization );
-
- Schedulers.newThread().createWorker().schedule( worker );
-
- }
-
-
- @Override
- public void increment(
- final ApplicationScope scope, final Shard shard,
- final long count, final DirectedEdgeMeta directedEdgeMeta ) {
-
-
- final ShardKey key = new ShardKey( scope, shard, directedEdgeMeta );
-
- readLock.lock();
-
-
- try {
- currentCounter.add( key, count );
- }
- finally {
- readLock.unlock();
- }
-
- checkFlush();
- }
-
-
- @Override
- public long getCount( final ApplicationScope scope, final Shard shard, final DirectedEdgeMeta directedEdgeMeta ) {
-
- final ShardKey key = new ShardKey( scope, shard, directedEdgeMeta );
-
-
- readLock.lock();
-
- long count;
-
- try {
- count = currentCounter.get( key );
-
- }
- finally {
- readLock.unlock();
- }
-
-
- //read from Cassandra and add to get a "close enough" number
- return count + nodeShardCounterSerialization.getCount( key );
- }
-
-
- @Override
- public void beginFlush() {
-
- writeLockLock.lock();
-
- try {
-
- final boolean queued = flushQueue.offer( currentCounter );
-
- /**
- * We were able to q the beginFlush, swap it
- */
- if ( queued ) {
- currentCounter = new Counter();
- }
- }
- finally {
- writeLockLock.unlock();
- }
- }
-
-
- @Override
- public boolean flushPending() {
- return flushQueue.size() > 0 || worker.isFlushing();
- }
-
-
- /**
- * Check if we need to beginFlush. If we do, perform the beginFlush
- */
- private void checkFlush() {
-
- //there's no beginFlush pending and we're past the timeout or count
- if ( currentCounter.getCreateTimestamp() + graphFig.getCounterFlushInterval() > timeService.getCurrentTime()
- || currentCounter.getInvokeCount() >= graphFig.getCounterFlushCount() ) {
- beginFlush();
- }
- }
-
-
- /**
- * Worker that will take from the queue
- */
- private static class FlushWorker implements Action0 {
-
- private final BlockingQueue<Counter> counterQueue;
- private final NodeShardCounterSerialization nodeShardCounterSerialization;
-
- private volatile Counter rollUp;
-
-
- private FlushWorker( final BlockingQueue<Counter> counterQueue,
- final NodeShardCounterSerialization nodeShardCounterSerialization ) {
- this.counterQueue = counterQueue;
- this.nodeShardCounterSerialization = nodeShardCounterSerialization;
- }
-
-
- @Override
- public void call() {
-
-
- while ( true ) {
- /**
- * Block taking the first element. Once we take this, batch drain and roll up the rest
- */
-
- try {
- rollUp = null;
- rollUp = counterQueue.take();
- }
- catch ( InterruptedException e ) {
- LOG.error( "Unable to read from counter queue", e );
- throw new RuntimeException( "Unable to read from counter queue", e );
-
- }
-
-
-
-
- //copy to the batch outside of the command for performance
- final MutationBatch batch = nodeShardCounterSerialization.flush( rollUp );
-
- /**
- * Execute the command in hystrix to avoid slamming cassandra
- */
- new HystrixCommand( COUNT_GROUP ) {
-
- @Override
- protected Void run() throws Exception {
- batch.execute();
-
- return null;
- }
-
-
- @Override
- protected Object getFallback() {
- //we've failed to mutate. Merge this count back into the current one
- counterQueue.offer( rollUp );
-
- return null;
- }
- }.execute();
- }
-
- }
-
-
- /**
- * Return true if we're in the process of flushing
- * @return
- */
- public boolean isFlushing(){
- return rollUp != null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/591a2f1f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerialization.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerialization.java
deleted file mode 100644
index aafbd26..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerialization.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.persistence.graph.serialization.impl.shard.count;
-
-
-import org.apache.usergrid.persistence.core.migration.schema.Migration;
-
-import com.netflix.astyanax.MutationBatch;
-
-
-/**
- * Serialization for flushing and reading counters
- */
-public interface NodeShardCounterSerialization extends Migration {
-
-
- /**
- * Flush the counter to the mutation batch
- * @param counter
- * @return
- */
- public MutationBatch flush(Counter counter);
-
-
- /**
- * Get the count of this shard, if it exists.
- * @param key The shard key to get
- * @return
- */
- public long getCount(ShardKey key);
-
-}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/591a2f1f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java
deleted file mode 100644
index 6934275..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.persistence.graph.serialization.impl.shard.count;
-
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.cassandra.db.marshal.BytesType;
-import org.apache.cassandra.db.marshal.CounterColumnType;
-
-import org.apache.usergrid.persistence.core.astyanax.CassandraConfig;
-import org.apache.usergrid.persistence.core.astyanax.ColumnTypes;
-import org.apache.usergrid.persistence.core.astyanax.CompositeFieldSerializer;
-import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer;
-import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
-import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
-import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
-import org.apache.usergrid.persistence.core.astyanax.ScopedRowKeySerializer;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
-import org.apache.usergrid.persistence.graph.GraphFig;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.serialize.EdgeShardRowKeySerializer;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.common.base.Preconditions;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import com.netflix.astyanax.Keyspace;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.OperationResult;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-import com.netflix.astyanax.connectionpool.exceptions.NotFoundException;
-import com.netflix.astyanax.model.Column;
-import com.netflix.astyanax.model.CompositeBuilder;
-import com.netflix.astyanax.model.CompositeParser;
-import com.netflix.astyanax.serializers.BooleanSerializer;
-
-
-@Singleton
-public class NodeShardCounterSerializationImpl implements NodeShardCounterSerialization {
-
-
- private static final ShardKeySerializer SHARD_KEY_SERIALIZER = new ShardKeySerializer();
-
- /**
- * Edge shards
- */
- private static final MultiTennantColumnFamily<ScopedRowKey<ShardKey>, Boolean> EDGE_SHARD_COUNTS =
- new MultiTennantColumnFamily<>( "Edge_Shard_Counts",
- new ScopedRowKeySerializer<>( SHARD_KEY_SERIALIZER ), BooleanSerializer.get() );
-
-
- protected final Keyspace keyspace;
- protected final CassandraConfig cassandraConfig;
- protected final GraphFig graphFig;
-
-
- @Inject
- public NodeShardCounterSerializationImpl( final Keyspace keyspace, final CassandraConfig cassandraConfig,
- final GraphFig graphFig ) {
- this.keyspace = keyspace;
- this.cassandraConfig = cassandraConfig;
- this.graphFig = graphFig;
- }
-
-
- @Override
- public MutationBatch flush( final Counter counter ) {
-
-
- Preconditions.checkNotNull( counter, "counter must be specified" );
-
-
- final MutationBatch batch = keyspace.prepareMutationBatch();
-
- for ( Map.Entry<ShardKey, AtomicLong> entry : counter.getEntries() ) {
-
- final ShardKey key = entry.getKey();
- final long value = entry.getValue().get();
-
-
- final ScopedRowKey rowKey = ScopedRowKey.fromKey( key.scope.getApplication(), key );
-
-
- batch.withRow( EDGE_SHARD_COUNTS, rowKey ).incrementCounterColumn(true , value );
- }
-
-
- return batch;
- }
-
-
- @Override
- public long getCount( final ShardKey key ) {
-
- final ScopedRowKey rowKey = ScopedRowKey.fromKey( key.scope.getApplication(), key );
-
-
- OperationResult<Column<Boolean>> column = null;
- try {
- column = keyspace.prepareQuery( EDGE_SHARD_COUNTS ).getKey( rowKey ).getColumn( true ).execute();
- }
- //column not found, return 0
- catch ( NotFoundException nfe ) {
- return 0;
- }
- catch ( ConnectionException e ) {
- throw new RuntimeException( "Unable to read from cassandra", e );
- }
-
- return column.getResult().getLongValue();
- }
-
-
- @Override
- public Collection<MultiTennantColumnFamilyDefinition> getColumnFamilies() {
- return Collections.singleton(
- new MultiTennantColumnFamilyDefinition( EDGE_SHARD_COUNTS, BytesType.class.getSimpleName(),
- ColumnTypes.BOOLEAN, CounterColumnType.class.getSimpleName(),
- MultiTennantColumnFamilyDefinition.CacheOption.ALL ) );
- }
-
-
-
- private static class ShardKeySerializer implements CompositeFieldSerializer<ShardKey> {
-
-
- private static final IdRowCompositeSerializer ID_SER = IdRowCompositeSerializer.get();
-
- private static final EdgeShardRowKeySerializer EDGE_SHARD_ROW_KEY_SERIALIZER = EdgeShardRowKeySerializer.INSTANCE;
-
-
- @Override
- public void toComposite( final CompositeBuilder builder, final ShardKey key ) {
-
- ID_SER.toComposite( builder, key.scope.getApplication() );
-
- EDGE_SHARD_ROW_KEY_SERIALIZER.toComposite( builder, key.directedEdgeMeta );
-
- builder.addLong( key.shard.getShardIndex() );
-
- builder.addLong( key.shard.getCreatedTime() );
- }
-
-
- @Override
- public ShardKey fromComposite( final CompositeParser composite ) {
-
- final Id applicationId = ID_SER.fromComposite( composite );
-
- final ApplicationScope scope = new ApplicationScopeImpl( applicationId );
-
- final DirectedEdgeMeta directedEdgeMeta = EDGE_SHARD_ROW_KEY_SERIALIZER.fromComposite( composite );
-
- final long shardIndex = composite.readLong();
-
- final long shardCreatedTime = composite.readLong();
-
- return new ShardKey( scope, new Shard( shardIndex, shardCreatedTime, false ), directedEdgeMeta );
- }
-
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/591a2f1f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/ShardKey.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/ShardKey.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/ShardKey.java
deleted file mode 100644
index c976210..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/ShardKey.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.persistence.graph.serialization.impl.shard.count;
-
-
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
-
-
-/**
- * Key for shards and counts
- */
-public class ShardKey {
- public final ApplicationScope scope;
- public final Shard shard;
- public final DirectedEdgeMeta directedEdgeMeta;
-
-
- public ShardKey( final ApplicationScope scope, final Shard shard, final DirectedEdgeMeta directedEdgeMeta ) {
- this.scope = scope;
- this.shard = shard;
- this.directedEdgeMeta = directedEdgeMeta;
- }
-
-
- @Override
- public boolean equals( final Object o ) {
- if ( this == o ) {
- return true;
- }
- if ( o == null || getClass() != o.getClass() ) {
- return false;
- }
-
- final ShardKey shardKey = ( ShardKey ) o;
-
- if ( !directedEdgeMeta.equals( shardKey.directedEdgeMeta ) ) {
- return false;
- }
- if ( !scope.equals( shardKey.scope ) ) {
- return false;
- }
- if ( !shard.equals( shardKey.shard ) ) {
- return false;
- }
-
- return true;
- }
-
-
- @Override
- public int hashCode() {
- int result = scope.hashCode();
- result = 31 * result + shard.hashCode();
- result = 31 * result + directedEdgeMeta.hashCode();
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/591a2f1f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
index b0875af..8943737 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
@@ -37,7 +37,6 @@ import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEd
import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeColumnFamilies;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardSerialization;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardAllocation;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardApproximation;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardGroupCompaction;
@@ -63,7 +62,6 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
private final EdgeShardSerialization edgeShardSerialization;
private final EdgeColumnFamilies edgeColumnFamilies;
private final ShardedEdgeSerialization shardedEdgeSerialization;
- private final NodeShardApproximation nodeShardApproximation;
private final TimeService timeService;
private final GraphFig graphFig;
private final ShardGroupCompaction shardGroupCompaction;
@@ -72,13 +70,11 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
@Inject
public NodeShardAllocationImpl( final EdgeShardSerialization edgeShardSerialization,
final EdgeColumnFamilies edgeColumnFamilies,
- final ShardedEdgeSerialization shardedEdgeSerialization,
- final NodeShardApproximation nodeShardApproximation, final TimeService timeService,
+ final ShardedEdgeSerialization shardedEdgeSerialization, final TimeService timeService,
final GraphFig graphFig, final ShardGroupCompaction shardGroupCompaction ) {
this.edgeShardSerialization = edgeShardSerialization;
this.edgeColumnFamilies = edgeColumnFamilies;
this.shardedEdgeSerialization = shardedEdgeSerialization;
- this.nodeShardApproximation = nodeShardApproximation;
this.timeService = timeService;
this.graphFig = graphFig;
this.shardGroupCompaction = shardGroupCompaction;
@@ -166,18 +162,11 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
* Check out if we have a count for our shard allocation
*/
- final long count = nodeShardApproximation.getCount( scope, shard, directedEdgeMeta );
+
final long shardSize = graphFig.getShardSize();
- if ( count < shardSize ) {
- return false;
- }
-
- if ( LOG.isDebugEnabled() ) {
- LOG.debug( "Count of {} has exceeded shard config of {} will begin compacting", count, shardSize );
- }
/**
* We want to allocate a new shard as close to the max value as possible. This way if we're filling up a
@@ -234,10 +223,10 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
/**
- * Sanity check in case our counters become severely out of sync with our edge state in cassandra.
+ * Sanity check in case we audit before we have a full shard
*/
if ( marked == null ) {
- LOG.warn( "Incorrect shard count for shard group {}", shardEntryGroup );
+ LOG.info( "Shard {} in shard group {} not full, not splitting", shardEntryGroup );
return false;
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/591a2f1f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java
index 6d2a009..ea10ed5 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java
@@ -167,6 +167,13 @@ public class ShardGroupDeletionImpl implements ShardGroupDeletion {
continue;
}
+ //The shard is not compacted, we cannot remove it. This should never happen, a bit of an "oh shit" scenario.
+ //the isCompactionPending should return false in this case
+ if(!shard.isCompacted()){
+ logger.warn( "Shard {} in group {} is not compacted yet was checked. Short circuiting", shard, shardEntryGroup );
+ return DeleteResult.NO_OP;
+ }
+
final MutationBatch shardRemovalMutation =
edgeShardSerialization.removeShardMeta( applicationScope, shard, directedEdgeMeta );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/591a2f1f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
index 2ef50f5..1060495 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
@@ -123,10 +123,6 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
final Shard shard, final boolean isDeleted ) {
batch.withRow( columnFamily, ScopedRowKey.fromKey( scope.getApplication(), rowKey ) ).putColumn( edge, isDeleted );
-
- if ( !isDeleted ) {
- writeEdgeShardStrategy.increment( scope, shard, 1, directedEdgeMeta );
- }
}
}.createBatch( scope, shards, timestamp );
}
@@ -153,11 +149,6 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
batch.withRow( columnFamily, ScopedRowKey.fromKey( scope.getApplication(), rowKey ) ).putColumn( edge, isDeleted );
-
-
- if ( !isDeleted ) {
- writeEdgeShardStrategy.increment( scope, shard, 1, directedEdgeMeta );
- }
}
}.createBatch( scope, shards, timestamp );
}
@@ -182,10 +173,6 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
batch.withRow( columnFamily, ScopedRowKey.fromKey( scope.getApplication(), rowKey ) ).putColumn( edge, isDeleted );
-
- if ( !isDeleted ) {
- writeEdgeShardStrategy.increment( scope, shard, 1, targetEdgeMeta );
- }
}
}.createBatch( scope, shards, timestamp );
}
@@ -212,11 +199,6 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
batch.withRow( columnFamilies.getTargetNodeSourceTypeCfName(), ScopedRowKey.fromKey( scope.getApplication(), rowKey ) )
.putColumn( edge, isDeleted );
-
-
- if ( !isDeleted ) {
- writeEdgeShardStrategy.increment( scope, shard, 1, directedEdgeMeta );
- }
}
}.createBatch( scope, shards, timestamp );
}
@@ -241,11 +223,6 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
final boolean isDeleted ) {
batch.withRow( columnFamilies.getGraphEdgeVersions(), ScopedRowKey.fromKey( scope.getApplication(), rowKey ) )
.putColumn( column, isDeleted );
-
-
- if ( !isDeleted ) {
- writeEdgeShardStrategy.increment( scope, shard, 1, directedEdgeMeta );
- }
}
}.createBatch( scope, shards, timestamp );
}
@@ -265,7 +242,6 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
final Shard shard, final boolean isDeleted ) {
batch.withRow( columnFamily, ScopedRowKey.fromKey( scope.getApplication(), rowKey ) ).deleteColumn( edge );
- writeEdgeShardStrategy.increment( scope, shard, -1, directedEdgeMeta );
}
}.createBatch( scope, shards, timestamp );
}
@@ -288,7 +264,6 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
batch.withRow( columnFamilies.getSourceNodeTargetTypeCfName(), ScopedRowKey.fromKey( scope.getApplication(), rowKey ) )
.deleteColumn( edge );
- writeEdgeShardStrategy.increment( scope, shard, -1, directedEdgeMeta );
}
}.createBatch( scope, shards, timestamp );
}
@@ -308,7 +283,6 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
final Shard shard, final boolean isDeleted ) {
batch.withRow( columnFamily, ScopedRowKey.fromKey( scope.getApplication(), rowKey ) ).deleteColumn( edge );
- writeEdgeShardStrategy.increment( scope, shard, -1, directedEdgeMeta );
}
}.createBatch( scope, shards, timestamp );
}
@@ -331,7 +305,6 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
batch.withRow( columnFamilies.getTargetNodeSourceTypeCfName(), ScopedRowKey.fromKey( scope.getApplication(), rowKey ) )
.deleteColumn( edge );
- writeEdgeShardStrategy.increment( scope, shard, -1, directedEdgeMeta );
}
}.createBatch( scope, shards, timestamp );
}
@@ -351,7 +324,6 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
final boolean isDeleted ) {
batch.withRow( columnFamilies.getGraphEdgeVersions(), ScopedRowKey.fromKey( scope.getApplication(), rowKey ) )
.deleteColumn( column );
- writeEdgeShardStrategy.increment( scope, shard, -1, directedEdgeMeta );
}
}.createBatch( scope, shards, timestamp );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/591a2f1f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeShardStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeShardStrategy.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeShardStrategy.java
index 8787d97..cf164ba 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeShardStrategy.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeShardStrategy.java
@@ -25,9 +25,7 @@ import java.util.Iterator;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardStrategy;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardApproximation;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardCache;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
import com.google.inject.Inject;
@@ -42,14 +40,11 @@ public class SizebasedEdgeShardStrategy implements EdgeShardStrategy {
private final NodeShardCache shardCache;
- private final NodeShardApproximation shardApproximation;
@Inject
- public SizebasedEdgeShardStrategy( final NodeShardCache shardCache,
- final NodeShardApproximation shardApproximation ) {
+ public SizebasedEdgeShardStrategy( final NodeShardCache shardCache) {
this.shardCache = shardCache;
- this.shardApproximation = shardApproximation;
}
@@ -65,10 +60,4 @@ public class SizebasedEdgeShardStrategy implements EdgeShardStrategy {
return shardCache.getReadShardGroup( scope, maxTimestamp, directedEdgeMeta );
}
-
- @Override
- public void increment( final ApplicationScope scope, final Shard shard,
- final long count, final DirectedEdgeMeta directedEdgeMeta) {
- shardApproximation.increment( scope, shard, count, directedEdgeMeta );
- }
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/591a2f1f/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardingIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardingIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardingIT.java
deleted file mode 100644
index 2951efe..0000000
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardingIT.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- *
- * * Licensed to the Apache Software Foundation (ASF) under one
- * * or more contributor license agreements. See the NOTICE file
- * * distributed with this work for additional information
- * * regarding copyright ownership. The ASF licenses this file
- * * to you under the Apache License, Version 2.0 (the
- * * "License"); you may not use this file except in compliance
- * * with the License. You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing,
- * * software distributed under the License is distributed on an
- * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * * KIND, either express or implied. See the License for the
- * * specific language governing permissions and limitations
- * * under the License.
- *
- */
-package org.apache.usergrid.persistence.graph;
-
-
-import java.util.concurrent.TimeoutException;
-
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-
-import org.apache.usergrid.persistence.core.guice.MigrationManagerRule;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
-import org.apache.usergrid.persistence.core.test.ITRunner;
-import org.apache.usergrid.persistence.core.test.UseModules;
-import org.apache.usergrid.persistence.core.util.IdGenerator;
-import org.apache.usergrid.persistence.graph.guice.TestGraphModule;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardApproximation;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.inject.Inject;
-
-import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createEdge;
-import static org.apache.usergrid.persistence.core.util.IdGenerator.createId;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-
-@RunWith( ITRunner.class )
-@UseModules( TestGraphModule.class )
-@Ignore("Kills cassandra")
-public class GraphManagerShardingIT {
-
-
- @Inject
- @Rule
- public MigrationManagerRule migrationManagerRule;
-
-
- @Inject
- protected GraphManagerFactory emf;
-
-
- @Inject
- protected GraphFig graphFig;
-
- @Inject
- protected NodeShardApproximation nodeShardApproximation;
-
- protected ApplicationScope scope;
-
-
-
-
- @Before
- public void mockApp() {
- this.scope = new ApplicationScopeImpl( IdGenerator.createId( "application" ) );
- }
-
-
- @Test
- public void testWriteSourceType() throws TimeoutException, InterruptedException {
-
- GraphManager gm = emf.createEdgeManager( scope ) ;
-
- final Id sourceId = IdGenerator.createId( "source" );
- final String edgeType = "test";
-
-
-
-
- final long flushCount = graphFig.getCounterFlushCount();
- final long maxShardSize = graphFig.getShardSize();
-
-
-
-
- final long startTime = System.currentTimeMillis();
-
- //each edge causes 4 counts
- final long writeCount = flushCount/4;
-
- assertTrue( "Shard size must be >= beginFlush Count", maxShardSize >= flushCount );
-
- Id targetId = null;
-
- for(long i = 0; i < writeCount; i ++){
- targetId = IdGenerator.createId( "target" ) ;
-
- final Edge edge = createEdge( sourceId, edgeType, targetId);
-
- gm.writeEdge( edge ).toBlocking().last();
-
- }
-
-
-
- final DirectedEdgeMeta sourceEdgeMeta = DirectedEdgeMeta.fromSourceNodeTargetType( sourceId, edgeType,
- targetId.getType() );
- final Shard shard = new Shard(0, 0, true);
-
-
- long shardCount = nodeShardApproximation.getCount( scope, shard, sourceEdgeMeta );
-
- assertEquals("Shard count for source node should be the same as write count", writeCount, shardCount);
-
-
- //now verify it's correct for the target
- final DirectedEdgeMeta targetEdgeMeta = DirectedEdgeMeta.fromTargetNodeSourceType(targetId, edgeType, sourceId.getType() );
-
-
- shardCount = nodeShardApproximation.getCount( scope, shard, targetEdgeMeta );
-
- assertEquals(1, shardCount);
-
- }
-
-
- @Test
- public void testWriteTargetType() throws TimeoutException, InterruptedException {
-
- GraphManager gm = emf.createEdgeManager( scope ) ;
-
- final Id targetId = IdGenerator.createId( "target" );
- final String edgeType = "test";
-
-
-
-
- final long flushCount = graphFig.getCounterFlushCount();
- final long maxShardSize = graphFig.getShardSize();
-
-
- //each edge causes 4 counts
- final long writeCount = flushCount/4;
-
- assertTrue( "Shard size must be >= beginFlush Count", maxShardSize >= flushCount );
-
- Id sourceId = null;
-
- for(long i = 0; i < writeCount; i ++){
- sourceId = IdGenerator.createId( "source" ) ;
-
- final Edge edge = createEdge( sourceId, edgeType, targetId);
-
- gm.writeEdge( edge ).toBlocking().last();
-
- }
-
-
- //this is from target->source, since the target id doesn't change
- final DirectedEdgeMeta targetMeta = DirectedEdgeMeta.fromTargetNode( targetId, edgeType );
- final Shard shard = new Shard(0l, 0l, true);
-
- long targetWithType = nodeShardApproximation.getCount( scope, shard, targetMeta );
-
- assertEquals("Shard count for target node should be the same as write count", writeCount, targetWithType);
-
-
- final DirectedEdgeMeta targetNodeSource = DirectedEdgeMeta.fromTargetNodeSourceType( targetId, edgeType, "source" );
-
- long shardCount = nodeShardApproximation.getCount( scope, shard, targetNodeSource );
-
- assertEquals("Shard count for target node should be the same as write count", writeCount, shardCount);
-
-
- //now verify it's correct for the target
-
- final DirectedEdgeMeta sourceMeta = DirectedEdgeMeta.fromSourceNode( sourceId, edgeType );
-
- shardCount = nodeShardApproximation.getCount( scope, shard, sourceMeta );
-
- assertEquals(1, shardCount);
-
- }
-
-
-
-
-}
-
-
-
-
-
http://git-wip-us.apache.org/repos/asf/usergrid/blob/591a2f1f/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
index ac965cd..bc364cc 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
@@ -98,15 +98,13 @@ public class NodeShardAllocationTest {
final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
- final NodeShardApproximation nodeShardCounterSerialization = mock( NodeShardApproximation.class );
-
final TimeService timeService = mock( TimeService.class );
NodeShardAllocation approximation =
new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
- nodeShardCounterSerialization, timeService, graphFig, shardGroupCompaction );
+ timeService, graphFig, shardGroupCompaction );
final long timeservicetime = System.currentTimeMillis();
@@ -131,15 +129,12 @@ public class NodeShardAllocationTest {
final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
- final NodeShardApproximation nodeShardCounterSerialization = mock( NodeShardApproximation.class );
-
-
final TimeService timeService = mock( TimeService.class );
NodeShardAllocation approximation =
new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
- nodeShardCounterSerialization, timeService, graphFig, shardGroupCompaction );
+ timeService, graphFig, shardGroupCompaction );
final Id nodeId = IdGenerator.createId( "test" );
final String type = "type";
@@ -175,14 +170,11 @@ public class NodeShardAllocationTest {
final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
- final NodeShardApproximation nodeShardApproximation = mock( NodeShardApproximation.class );
-
-
final TimeService timeService = mock( TimeService.class );
NodeShardAllocation approximation =
new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
- nodeShardApproximation, timeService, graphFig, shardGroupCompaction );
+ timeService, graphFig, shardGroupCompaction );
final Id nodeId = IdGenerator.createId( "test" );
final String type = "type";
@@ -205,8 +197,6 @@ public class NodeShardAllocationTest {
final long count = graphFig.getShardSize() - 1;
- when( nodeShardApproximation.getCount( scope, futureShard, targetEdgeMeta ) ).thenReturn( count );
-
final boolean result = approximation.auditShard( scope, shardEntryGroup, targetEdgeMeta );
assertFalse( "Shard allocated", result );
@@ -223,14 +213,12 @@ public class NodeShardAllocationTest {
final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
- final NodeShardApproximation nodeShardApproximation = mock( NodeShardApproximation.class );
-
final TimeService timeService = mock( TimeService.class );
NodeShardAllocation approximation =
new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
- nodeShardApproximation, timeService, graphFig, shardGroupCompaction );
+ timeService, graphFig, shardGroupCompaction );
final Id nodeId = IdGenerator.createId( "test" );
final String type = "type";
@@ -255,9 +243,6 @@ public class NodeShardAllocationTest {
final long shardCount = ( long ) ( graphFig.getShardSize() * 2.5 );
- //return a shard size equal to our max
- when( nodeShardApproximation.getCount( scope, futureShard, targetEdgeMeta ) ).thenReturn( shardCount );
-
//this is how many we should be iterating and should set the value of the last shard we keep
final int numToIterate = ( int ) ( graphFig.getShardSize() * 2 );
@@ -338,14 +323,12 @@ public class NodeShardAllocationTest {
final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
- final NodeShardApproximation nodeShardApproximation = mock( NodeShardApproximation.class );
-
final TimeService timeService = mock( TimeService.class );
NodeShardAllocation approximation =
new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
- nodeShardApproximation, timeService, graphFig, shardGroupCompaction );
+ timeService, graphFig, shardGroupCompaction );
final Id nodeId = IdGenerator.createId( "test" );
final String type = "type";
@@ -378,9 +361,6 @@ public class NodeShardAllocationTest {
iteratedEdges.add( returnedEdge );
- //return a shard size equal to our max
- when( nodeShardApproximation.getCount( scope, futureShard, targetEdgeMeta ) ).thenReturn( shardCount );
-
ArgumentCaptor<Shard> shardValue = ArgumentCaptor.forClass( Shard.class );
@@ -429,14 +409,12 @@ public class NodeShardAllocationTest {
final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
- final NodeShardApproximation nodeShardApproximation = mock( NodeShardApproximation.class );
-
final TimeService timeService = mock( TimeService.class );
NodeShardAllocation approximation =
new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
- nodeShardApproximation, timeService, graphFig, shardGroupCompaction );
+ timeService, graphFig, shardGroupCompaction );
final Id nodeId = IdGenerator.createId( "test" );
final String type = "type";
@@ -456,8 +434,6 @@ public class NodeShardAllocationTest {
final long shardCount = graphFig.getShardSize();
- //return a shard size equal to our max
- when( nodeShardApproximation.getCount( scope, futureShard, targetEdgeMeta ) ).thenReturn( shardCount );
ArgumentCaptor<Shard> shardValue = ArgumentCaptor.forClass( Shard.class );
@@ -495,7 +471,6 @@ public class NodeShardAllocationTest {
final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
- final NodeShardApproximation nodeShardApproximation = mock( NodeShardApproximation.class );
final TimeService timeService = mock( TimeService.class );
@@ -503,7 +478,7 @@ public class NodeShardAllocationTest {
NodeShardAllocation approximation =
new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
- nodeShardApproximation, timeService, graphFig, shardGroupCompaction );
+ timeService, graphFig, shardGroupCompaction );
final Id nodeId = IdGenerator.createId( "test" );
final String type = "type";
@@ -630,9 +605,6 @@ public class NodeShardAllocationTest {
final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
- final NodeShardApproximation nodeShardApproximation = mock( NodeShardApproximation.class );
-
-
final TimeService timeService = mock( TimeService.class );
final long returnTime = System.currentTimeMillis() + graphFig.getShardCacheTimeout() * 2;
@@ -643,7 +615,7 @@ public class NodeShardAllocationTest {
NodeShardAllocation approximation =
new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
- nodeShardApproximation, timeService, graphFig, shardGroupCompaction );
+ timeService, graphFig, shardGroupCompaction );
final Id nodeId = IdGenerator.createId( "test" );
final String type = "type";
@@ -712,8 +684,6 @@ public class NodeShardAllocationTest {
final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
- final NodeShardApproximation nodeShardApproximation = mock( NodeShardApproximation.class );
-
/**
* Return 100000 milliseconds
@@ -736,7 +706,7 @@ public class NodeShardAllocationTest {
NodeShardAllocation approximation =
new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
- nodeShardApproximation, timeService, graphFig, shardGroupCompaction );
+ timeService, graphFig, shardGroupCompaction );
/**
http://git-wip-us.apache.org/repos/asf/usergrid/blob/591a2f1f/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java
deleted file mode 100644
index 32a0cda..0000000
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java
+++ /dev/null
@@ -1,627 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.persistence.graph.serialization.impl.shard.count;
-
-
-import java.beans.PropertyChangeListener;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.safehaus.guicyfig.Bypass;
-import org.safehaus.guicyfig.OptionState;
-import org.safehaus.guicyfig.Overrides;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
-import org.apache.usergrid.persistence.core.consistency.TimeService;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.util.IdGenerator;
-import org.apache.usergrid.persistence.graph.GraphFig;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardApproximation;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
-import org.apache.usergrid.persistence.model.entity.Id;
-import org.apache.usergrid.persistence.model.util.UUIDGenerator;
-
-import com.google.common.util.concurrent.ListenableFuture;
-import com.netflix.astyanax.ColumnListMutation;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.WriteAheadLog;
-import com.netflix.astyanax.connectionpool.Host;
-import com.netflix.astyanax.connectionpool.OperationResult;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-import com.netflix.astyanax.model.ColumnFamily;
-import com.netflix.astyanax.model.ConsistencyLevel;
-import com.netflix.astyanax.retry.RetryPolicy;
-
-import static org.apache.usergrid.persistence.core.util.IdGenerator.createId;
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-public class NodeShardApproximationTest {
-
- private static final Logger LOG = LoggerFactory.getLogger( NodeShardApproximation.class );
-
- private GraphFig graphFig;
-
- private NodeShardCounterSerialization nodeShardCounterSerialization;
- private TimeService timeService;
-
- protected ApplicationScope scope;
-
-
- @Before
- public void setup() {
- scope = mock( ApplicationScope.class );
-
- Id orgId = mock( Id.class );
-
- when( orgId.getType() ).thenReturn( "organization" );
- when( orgId.getUuid() ).thenReturn( UUIDGenerator.newTimeUUID() );
-
- when( scope.getApplication() ).thenReturn( orgId );
-
- graphFig = mock( GraphFig.class );
-
- when( graphFig.getShardCacheSize() ).thenReturn( 10000l );
- when( graphFig.getShardSize() ).thenReturn( 250000l );
- when( graphFig.getCounterFlushQueueSize() ).thenReturn( 10000 );
-
- nodeShardCounterSerialization = mock( NodeShardCounterSerialization.class );
-
- when( nodeShardCounterSerialization.flush( any( Counter.class ) ) ).thenReturn( mock( MutationBatch.class ) );
-
-
- timeService = mock( TimeService.class );
-
- when( timeService.getCurrentTime() ).thenReturn( System.currentTimeMillis() );
- }
-
-
- @Test
- public void testSingleShard() throws InterruptedException {
-
-
- when( graphFig.getCounterFlushCount() ).thenReturn( 100000l );
- NodeShardApproximation approximation =
- new NodeShardApproximationImpl( graphFig, nodeShardCounterSerialization, timeService );
-
-
- final Id id = IdGenerator.createId( "test" );
- final Shard shard = new Shard( 0, 0, true );
- final String type = "type";
- final String type2 = "subType";
-
- final DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromTargetNodeSourceType( id, type, type2 );
-
- long count = approximation.getCount( scope, shard, directedEdgeMeta );
-
- waitForFlush( approximation );
-
- assertEquals( 0, count );
- }
-
-
- @Ignore("outdated and no longer relevant test")
- @Test
- public void testSingleShardMultipleThreads() throws ExecutionException, InterruptedException {
-
-
- NodeShardCounterSerialization serialization = new TestNodeShardCounterSerialization();
-
- final NodeShardApproximation approximation =
- new NodeShardApproximationImpl( new TestGraphFig(), serialization, new TestTimeService() );
-
-
- final int increments = 1000000;
- final int workers = Runtime.getRuntime().availableProcessors() * 2;
-
- final Id id = IdGenerator.createId( "test" );
- final String type = "type";
- final String type2 = "subType";
-
- final Shard shard = new Shard( 10000, 0, true );
-
- final DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromTargetNodeSourceType( id, type, type2 );
-
- ExecutorService executor = Executors.newFixedThreadPool( workers );
-
- List<Future<Long>> futures = new ArrayList<>( workers );
-
- for ( int i = 0; i < workers; i++ ) {
-
- final Future<Long> future = executor.submit( new Callable<Long>() {
- @Override
- public Long call() throws Exception {
-
- for ( int i = 0; i < increments; i++ ) {
- approximation.increment( scope, shard, 1, directedEdgeMeta );
- }
-
- return 0l;
- }
- } );
-
- futures.add( future );
- }
-
-
- for ( Future<Long> future : futures ) {
- future.get();
- }
-
- waitForFlush( approximation );
- //get our count. It should be accurate b/c we only have 1 instance
-
- final long returnedCount = approximation.getCount( scope, shard, directedEdgeMeta );
- final long expected = workers * increments;
-
-
- assertEquals( expected, returnedCount );
-
- //test we get nothing with the other type
-
- final long emptyCount =
- approximation.getCount( scope, shard, DirectedEdgeMeta.fromSourceNodeTargetType( id, type, type2 ) );
-
-
- assertEquals( 0, emptyCount );
- }
-
-
- @Ignore("outdated and no longer relevant test")
- @Test
- public void testMultipleShardMultipleThreads() throws ExecutionException, InterruptedException {
-
-
- NodeShardCounterSerialization serialization = new TestNodeShardCounterSerialization();
-
- final NodeShardApproximation approximation =
- new NodeShardApproximationImpl( new TestGraphFig(), serialization, new TestTimeService() );
-
-
- final int increments = 1000000;
- final int workers = Runtime.getRuntime().availableProcessors() * 2;
-
- final Id id = IdGenerator.createId( "test" );
- final String type = "type";
- final String type2 = "subType";
-
- final AtomicLong shardIdCounter = new AtomicLong();
-
-
- final DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromTargetNodeSourceType( id, type, type2 );
-
-
- ExecutorService executor = Executors.newFixedThreadPool( workers );
-
- List<Future<Shard>> futures = new ArrayList<>( workers );
-
- for ( int i = 0; i < workers; i++ ) {
-
- final Future<Shard> future = executor.submit( new Callable<Shard>() {
- @Override
- public Shard call() throws Exception {
-
- final long threadShardId = shardIdCounter.incrementAndGet();
-
- final Shard shard = new Shard( threadShardId, 0, true );
-
- for ( int i = 0; i < increments; i++ ) {
- approximation.increment( scope, shard, 1, directedEdgeMeta );
- }
-
- return shard;
- }
- } );
-
- futures.add( future );
- }
-
-
- for ( Future<Shard> future : futures ) {
- final Shard shardId = future.get();
-
- waitForFlush( approximation );
-
- final long returnedCount = approximation.getCount( scope, shardId, directedEdgeMeta );
-
- assertEquals( increments, returnedCount );
- }
- }
-
-
- private void waitForFlush( NodeShardApproximation approximation ) throws InterruptedException {
-
- approximation.beginFlush();
-
- while ( approximation.flushPending() ) {
-
- LOG.info( "Waiting on beginFlush to complete" );
-
- Thread.sleep( 100 );
- }
- }
-
-
- /**
- * These are created b/c we can't use Mockito. It OOM's with keeping track of all the mock invocations
- */
-
- private static class TestNodeShardCounterSerialization implements NodeShardCounterSerialization {
-
- private Counter copy = new Counter();
-
-
- @Override
- public MutationBatch flush( final Counter counter ) {
- copy.merge( counter );
- return new TestMutationBatch();
- }
-
-
- @Override
- public long getCount( final ShardKey key ) {
- return copy.get( key );
- }
-
-
- @Override
- public Collection<MultiTennantColumnFamilyDefinition> getColumnFamilies() {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
- }
-
-
- /**
- * Simple test mutation to no-op during tests
- */
- private
- static class TestMutationBatch implements MutationBatch {
-
- @Override
- public <K, C> ColumnListMutation<C> withRow( final ColumnFamily<K, C> columnFamily, final K rowKey ) {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public <K> void deleteRow( final Iterable<? extends ColumnFamily<K, ?>> columnFamilies, final K rowKey ) {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public void discardMutations() {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public void mergeShallow( final MutationBatch other ) {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public boolean isEmpty() {
- return false; //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public int getRowCount() {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public Map<ByteBuffer, Set<String>> getRowKeys() {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public MutationBatch pinToHost( final Host host ) {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public MutationBatch setConsistencyLevel( final ConsistencyLevel consistencyLevel ) {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public MutationBatch withConsistencyLevel( final ConsistencyLevel consistencyLevel ) {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public MutationBatch withRetryPolicy( final RetryPolicy retry ) {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public MutationBatch usingWriteAheadLog( final WriteAheadLog manager ) {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public MutationBatch lockCurrentTimestamp() {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public MutationBatch setTimeout( final long timeout ) {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public MutationBatch setTimestamp( final long timestamp ) {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public MutationBatch withTimestamp( final long timestamp ) {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public MutationBatch withAtomicBatch( final boolean condition ) {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public ByteBuffer serialize() throws Exception {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public void deserialize( final ByteBuffer data ) throws Exception {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public OperationResult<Void> execute() throws ConnectionException {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public ListenableFuture<OperationResult<Void>> executeAsync() throws ConnectionException {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
- }
-
-
- private static class TestGraphFig implements GraphFig {
-
- @Override
- public int getScanPageSize() {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public int getRepairConcurrentSize() {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public double getShardRepairChance() {
- return 0;
- }
-
-
- @Override
- public long getShardSize() {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public long getShardCacheTimeout() {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public long getShardMinDelta() {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public long getShardCacheSize() {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public int getShardCacheRefreshWorkerCount() {
- return 0;
- }
-
-
- @Override
- public int getShardAuditWorkerCount() {
- return 0;
- }
-
-
- @Override
- public int getShardAuditWorkerQueueSize() {
- return 0;
- }
-
-
- @Override
- public long getCounterFlushCount() {
- return 100000l;
- }
-
-
- @Override
- public long getCounterFlushInterval() {
- return 30000l;
- }
-
-
- @Override
- public int getCounterFlushQueueSize() {
- return 10000;
- }
-
-
- @Override
- public void addPropertyChangeListener( final PropertyChangeListener propertyChangeListener ) {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public void removePropertyChangeListener( final PropertyChangeListener propertyChangeListener ) {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public OptionState[] getOptions() {
- return new OptionState[0]; //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public OptionState getOption( final String s ) {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public String getKeyByMethod( final String s ) {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public Object getValueByMethod( final String s ) {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public Properties filterOptions( final Properties properties ) {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public Map<String, Object> filterOptions( final Map<String, Object> stringObjectMap ) {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public void override( final String s, final String s2 ) {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public boolean setOverrides( final Overrides overrides ) {
- return false; //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public Overrides getOverrides() {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public void bypass( final String s, final String s2 ) {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public boolean setBypass( final Bypass bypass ) {
- return false; //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public Bypass getBypass() {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public Class getFigInterface() {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- @Override
- public boolean isSingleton() {
- return false; //To change body of implemented methods use File | Settings | File Templates.
- }
- }
-
-
- private static class TestTimeService implements TimeService {
-
- @Override
- public long getCurrentTime() {
- return System.currentTimeMillis();
- }
- }
-}