You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2016/10/12 13:40:01 UTC
[2/2] usergrid git commit: Adding concurrent tests for shard and
message counters.
Adding concurrent tests for shard and message counters.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/f56e1b0d
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/f56e1b0d
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/f56e1b0d
Branch: refs/heads/usergrid-1318-queue
Commit: f56e1b0d18cc8bb224dcede58d8171df8f0c0858
Parents: 2afaa92
Author: Dave Johnson <sn...@apache.org>
Authored: Wed Oct 12 09:38:05 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Wed Oct 12 09:38:05 2016 -0400
----------------------------------------------------------------------
.../impl/MessageCounterSerializationTest.java | 50 +++++++++++++++++++
.../sharding/ShardCounterSerializationTest.java | 51 ++++++++++++++++++++
2 files changed, 101 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/f56e1b0d/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationTest.java
index a4ea0f1..1f18db7 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationTest.java
@@ -28,6 +28,11 @@ import org.apache.usergrid.persistence.qakka.serialization.queuemessages.Message
import org.junit.Assert;
import org.junit.Test;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
import static org.junit.Assert.fail;
@@ -87,4 +92,49 @@ public class MessageCounterSerializationTest extends AbstractTest {
Assert.assertEquals( 70, mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT ) );
}
+
+ @Test
+ public void testConcurrentOperation() {
+
+ // create multiple threads, each will increment and decrement counter by same number
+
+ Injector injector = getInjector();
+ MessageCounterSerialization mcs = injector.getInstance( MessageCounterSerialization.class );
+ String queueName = "mtco_queue_" + RandomStringUtils.randomAlphanumeric( 10 );
+
+ int poolSize = 20;
+ int numThreads = 20;
+ int numCounts = 3000;
+ ExecutorService execService = Executors.newFixedThreadPool( poolSize );
+
+ for (int i = 0; i < numThreads; i++) {
+
+ execService.submit( () -> {
+
+ for ( int j = 0; j < numCounts; j++ ) {
+ mcs.incrementCounter( queueName, DatabaseQueueMessage.Type.DEFAULT, 1 );
+ }
+
+ for ( int k = 0; k < numCounts; k++ ) {
+ mcs.decrementCounter( queueName, DatabaseQueueMessage.Type.DEFAULT, 1 );
+ }
+ });
+ }
+
+ execService.shutdown();
+
+ try {
+ while (!execService.awaitTermination( 3, TimeUnit.SECONDS )) {
+ System.out.println( "Waiting... " +
+ mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT ) );
+ }
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ // at end counter should be zero
+
+ Assert.assertEquals( 0, mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT ) );
+ }
+
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/f56e1b0d/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java
index 8dc16bb..bc80943 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java
@@ -19,14 +19,21 @@
package org.apache.usergrid.persistence.qakka.serialization.sharding;
+import com.google.inject.Injector;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.usergrid.persistence.qakka.core.CassandraClient;
import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
import org.apache.usergrid.persistence.qakka.AbstractTest;
import org.apache.usergrid.persistence.qakka.exceptions.NotFoundException;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization;
import org.junit.Assert;
import org.junit.Test;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
import static org.junit.Assert.fail;
@@ -58,4 +65,48 @@ public class ShardCounterSerializationTest extends AbstractTest {
Assert.assertEquals( 210, scs.getCounterValue( queueName, Shard.Type.DEFAULT, shardId ) );
}
+
+ @Test
+ public void testConcurrentOperation() {
+
+ // create multiple threads, each will increment counter by some number
+
+ Injector injector = getInjector();
+ ShardCounterSerialization scs = injector.getInstance( ShardCounterSerialization.class );
+ String queueName = "stco_queue_" + RandomStringUtils.randomAlphanumeric( 10 );
+ long shardId = 100L;
+
+ int poolSize = 20;
+ int numThreads = 20;
+ int numCounts = 3000;
+ ExecutorService execService = Executors.newFixedThreadPool( poolSize );
+
+ for (int i = 0; i < numThreads; i++) {
+
+ execService.submit( () -> {
+
+ for ( int j = 0; j < numCounts; j++ ) {
+ scs.incrementCounter( queueName, Shard.Type.DEFAULT, shardId, 1 );
+ }
+
+ });
+ }
+
+ execService.shutdown();
+
+ try {
+ while (!execService.awaitTermination( 3, TimeUnit.SECONDS )) {
+ System.out.println( "Waiting... " +
+ scs.getCounterValue( queueName, Shard.Type.DEFAULT, shardId ) );
+ }
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ // test that counter is correct value
+
+ Assert.assertEquals( numThreads * numCounts,
+ scs.getCounterValue( queueName, Shard.Type.DEFAULT, shardId ) );
+ }
+
}