You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2016/10/12 13:40:01 UTC

[2/2] usergrid git commit: Adding concurrent tests for shard and message counters.

Adding concurrent tests for shard and message counters.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/f56e1b0d
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/f56e1b0d
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/f56e1b0d

Branch: refs/heads/usergrid-1318-queue
Commit: f56e1b0d18cc8bb224dcede58d8171df8f0c0858
Parents: 2afaa92
Author: Dave Johnson <sn...@apache.org>
Authored: Wed Oct 12 09:38:05 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Wed Oct 12 09:38:05 2016 -0400

----------------------------------------------------------------------
 .../impl/MessageCounterSerializationTest.java   | 50 +++++++++++++++++++
 .../sharding/ShardCounterSerializationTest.java | 51 ++++++++++++++++++++
 2 files changed, 101 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/f56e1b0d/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationTest.java
index a4ea0f1..1f18db7 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationTest.java
@@ -28,6 +28,11 @@ import org.apache.usergrid.persistence.qakka.serialization.queuemessages.Message
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
 import static org.junit.Assert.fail;
 
 
@@ -87,4 +92,49 @@ public class MessageCounterSerializationTest extends AbstractTest {
         Assert.assertEquals( 70, mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT ) );
     }
 
+
+    @Test
+    public void testConcurrentOperation() {
+
+        // create multiple threads, each will increment and decrement counter by same number
+
+        Injector injector = getInjector();
+        MessageCounterSerialization mcs = injector.getInstance( MessageCounterSerialization.class );
+        String queueName = "mtco_queue_" + RandomStringUtils.randomAlphanumeric( 10 );
+
+        int poolSize = 20;
+        int numThreads = 20;
+        int numCounts = 3000;
+        ExecutorService execService = Executors.newFixedThreadPool( poolSize );
+
+        for (int i = 0; i < numThreads; i++) {
+
+            execService.submit( () -> {
+
+                for ( int j = 0; j < numCounts; j++ ) {
+                    mcs.incrementCounter( queueName, DatabaseQueueMessage.Type.DEFAULT, 1 );
+                }
+
+                for ( int k = 0; k < numCounts; k++ ) {
+                    mcs.decrementCounter( queueName, DatabaseQueueMessage.Type.DEFAULT, 1 );
+                }
+            });
+        }
+
+        execService.shutdown();
+
+        try {
+            while (!execService.awaitTermination( 3, TimeUnit.SECONDS )) {
+                System.out.println( "Waiting... " +
+                    mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT )  );
+            }
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+
+        // at end counter should be zero
+
+        Assert.assertEquals( 0, mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT ) );
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f56e1b0d/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java
index 8dc16bb..bc80943 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java
@@ -19,14 +19,21 @@
 
 package org.apache.usergrid.persistence.qakka.serialization.sharding;
 
+import com.google.inject.Injector;
 import org.apache.commons.lang.RandomStringUtils;
 import org.apache.usergrid.persistence.qakka.core.CassandraClient;
 import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
 import org.apache.usergrid.persistence.qakka.AbstractTest;
 import org.apache.usergrid.persistence.qakka.exceptions.NotFoundException;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
 import static org.junit.Assert.fail;
 
 
@@ -58,4 +65,48 @@ public class ShardCounterSerializationTest extends AbstractTest {
         Assert.assertEquals( 210, scs.getCounterValue( queueName, Shard.Type.DEFAULT, shardId ) );
     }
 
+
+    @Test
+    public void testConcurrentOperation() {
+
+        // create multiple threads, each will increment counter by some number
+
+        Injector injector = getInjector();
+        ShardCounterSerialization scs = injector.getInstance( ShardCounterSerialization.class );
+        String queueName = "stco_queue_" + RandomStringUtils.randomAlphanumeric( 10 );
+        long shardId = 100L;
+
+        int poolSize = 20;
+        int numThreads = 20;
+        int numCounts = 3000;
+        ExecutorService execService = Executors.newFixedThreadPool( poolSize );
+
+        for (int i = 0; i < numThreads; i++) {
+
+            execService.submit( () -> {
+
+                for ( int j = 0; j < numCounts; j++ ) {
+                    scs.incrementCounter( queueName, Shard.Type.DEFAULT, shardId, 1 );
+                }
+
+            });
+        }
+
+        execService.shutdown();
+
+        try {
+            while (!execService.awaitTermination( 3, TimeUnit.SECONDS )) {
+                System.out.println( "Waiting... " +
+                    scs.getCounterValue( queueName, Shard.Type.DEFAULT, shardId )  );
+            }
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+
+        // test that counter is correct value
+
+        Assert.assertEquals( numThreads * numCounts,
+            scs.getCounterValue( queueName, Shard.Type.DEFAULT, shardId ) );
+    }
+
 }