You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2016/10/12 13:40:00 UTC

[1/2] usergrid git commit: Use CQL batch statement for timeout operation as well.

Repository: usergrid
Updated Branches:
  refs/heads/usergrid-1318-queue 775257d27 -> f56e1b0d1


Use CQL batch statement for timeout operation as well.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/2afaa920
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/2afaa920
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/2afaa920

Branch: refs/heads/usergrid-1318-queue
Commit: 2afaa9201776bbfa8d999a6532a1d5eae3b650bc
Parents: 775257d
Author: Dave Johnson <sn...@apache.org>
Authored: Tue Oct 11 15:23:02 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Tue Oct 11 15:23:02 2016 -0400

----------------------------------------------------------------------
 .../distributed/actors/QueueTimeouter.java      | 25 +-----
 .../qakka/distributed/actors/QueueWriter.java   |  2 +-
 .../QueueMessageSerialization.java              |  5 ++
 .../impl/QueueMessageSerializationImpl.java     | 81 ++++++++++++++++----
 4 files changed, 75 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/2afaa920/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
index b7a95df..58afc76 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
@@ -99,30 +99,7 @@ public class QueueTimeouter extends UntypedActor {
                     if ((currentTime - queueMessage.getInflightAt()) > qakkaFig.getQueueTimeoutSeconds() * 1000) {
 
                         // put message back in messages_available table as new queue message with new UUID
-
-                        UUID newQueueMessageId = QakkaUtils.getTimeUuid();
-
-                        DatabaseQueueMessage newMessage = new DatabaseQueueMessage(
-                                queueMessage.getMessageId(),
-                                DatabaseQueueMessage.Type.DEFAULT,
-                                queueMessage.getQueueName(),
-                                queueMessage.getRegion(),
-                                null,
-                                queueMessage.getQueuedAt(),
-                                queueMessage.getInflightAt(),
-                                newQueueMessageId );
-
-                        messageSerialization.writeMessage( newMessage );
-
-                        // remove message from inflight table
-
-                        messageSerialization.deleteMessage(
-                                queueName,
-                                actorSystemFig.getRegionLocal(),
-                                null,
-                                DatabaseQueueMessage.Type.INFLIGHT,
-                                queueMessage.getQueueMessageId() );
-
+                        messageSerialization.timeoutInflight( queueMessage );
                         count++;
                     }
                 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/2afaa920/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
index a7dbbd0..c9be47f 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
@@ -92,7 +92,7 @@ public class QueueWriter extends UntypedActor {
                             qa.getDestRegion(),
                             null,
                             currentTime,
-                            currentTime,
+                            -1L,
                             queueMessageId );
 
                     messageSerialization.writeMessage( dbqm );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/2afaa920/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/QueueMessageSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/QueueMessageSerialization.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/QueueMessageSerialization.java
index 86c50a5..434c965 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/QueueMessageSerialization.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/QueueMessageSerialization.java
@@ -56,4 +56,9 @@ public interface QueueMessageSerialization extends Migration {
      * Write message to inflight table and remove from available table
      */
     void putInflight( DatabaseQueueMessage queueMessage );
+
+    /**
+     * Remove message from inflight table, write message to available table.
+     */
+    void timeoutInflight( DatabaseQueueMessage queueMessage );
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/2afaa920/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
index fba2bed..708132c 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
@@ -44,6 +44,7 @@ import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardCounter
 import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardStrategy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.concurrent.java8.FuturesConvertersImpl;
 
 import java.util.Collection;
 import java.util.Collections;
@@ -274,18 +275,14 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization
     @Override
     public void putInflight( DatabaseQueueMessage message ) {
 
-        // create statement to write new queue message to inflight table
-
-        Shard.Type shardType = Shard.Type.INFLIGHT;
-        Shard shard = shardStrategy.selectShard(
-            message.getQueueName(), message.getRegion(), shardType, message.getQueueMessageId() );
+        // create statement to write queue message to inflight table
 
         DatabaseQueueMessage inflightMessage = new DatabaseQueueMessage(
             message.getMessageId(),
             DatabaseQueueMessage.Type.INFLIGHT,
             message.getQueueName(),
             message.getRegion(),
-            shard.getShardId(),
+            null,
             message.getQueuedAt(),
             System.currentTimeMillis(),
             message.getQueueMessageId() );
@@ -321,6 +318,54 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization
     }
 
 
+    @Override
+    public void timeoutInflight( DatabaseQueueMessage message ) {
+
+        // create statement to write queue message back to available table, with new UUID
+
+        UUID newQueueMessageId = QakkaUtils.getTimeUuid();
+
+        DatabaseQueueMessage newMessage = new DatabaseQueueMessage(
+            message.getMessageId(),
+            DatabaseQueueMessage.Type.DEFAULT,
+            message.getQueueName(),
+            message.getRegion(),
+            null,
+            System.currentTimeMillis(),
+            -1L,
+            newQueueMessageId );
+
+        Statement write = createWriteMessageStatement( newMessage );
+
+        // create statement to remove message from inflight table
+
+        Statement delete = createDeleteMessageStatement(
+            message.getQueueName(),
+            message.getRegion(),
+            message.getShardId(),
+            message.getType(),
+            message.getQueueMessageId());
+
+        // execute statements as a batch
+
+        BatchStatement batchStatement = new BatchStatement();
+        batchStatement.add( write );
+        batchStatement.add( delete );
+        cassandraClient.getQueueMessageSession().execute( batchStatement );
+
+        // bump counters
+
+        shardCounterSerialization.incrementCounter(
+            message.getQueueName(), Shard.Type.DEFAULT, message.getShardId(), 1 );
+
+        messageCounterSerialization.incrementCounter(
+            message.getQueueName(), DatabaseQueueMessage.Type.DEFAULT, 1L );
+
+        messageCounterSerialization.decrementCounter(
+            message.getQueueName(), DatabaseQueueMessage.Type.INFLIGHT, 1L );
+    }
+
+
     private Statement createDeleteMessageStatement( final String queueName,
                                                     final String region,
                                                     final Long shardIdOrNull,
@@ -357,20 +402,30 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization
         final UUID queueMessageId =  message.getQueueMessageId() == null ?
             QakkaUtils.getTimeUuid() : message.getQueueMessageId();
 
-        long queuedAt = message.getQueuedAt() == null ?
-            System.currentTimeMillis() : message.getQueuedAt();
+        final long shardId;
+
+        if ( message.getShardId() != null ) {
+            shardId = message.getShardId();
 
-        long inflightAt = message.getInflightAt() == null ?
-            message.getQueuedAt() : message.getInflightAt();
+        } else if ( DatabaseQueueMessage.Type.DEFAULT.equals( message.getType() )) {
+            Shard shard = shardStrategy.selectShard(
+                message.getQueueName(), message.getRegion(), Shard.Type.DEFAULT, message.getQueueMessageId() );
+            shardId = shard.getShardId();
+
+        } else {
+            Shard shard = shardStrategy.selectShard(
+                message.getQueueName(), message.getRegion(), Shard.Type.INFLIGHT, message.getQueueMessageId() );
+            shardId = shard.getShardId();
+        }
 
         Statement insert = QueryBuilder.insertInto(getTableName(message.getType()))
             .value( COLUMN_QUEUE_NAME,       message.getQueueName())
             .value( COLUMN_REGION,           message.getRegion())
-            .value( COLUMN_SHARD_ID,         message.getShardId())
+            .value( COLUMN_SHARD_ID,         shardId)
             .value( COLUMN_MESSAGE_ID,       message.getMessageId())
             .value( COLUMN_QUEUE_MESSAGE_ID, queueMessageId)
-            .value( COLUMN_INFLIGHT_AT,      inflightAt )
-            .value( COLUMN_QUEUED_AT,        queuedAt)
+            .value( COLUMN_INFLIGHT_AT,      message.getInflightAt())
+            .value( COLUMN_QUEUED_AT,        message.getQueuedAt())
             .using( QueryBuilder.ttl( maxTtl ) );
 
         return insert;


[2/2] usergrid git commit: Adding concurrent tests for shard and message counters.

Posted by sn...@apache.org.
Adding concurrent tests for shard and message counters.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/f56e1b0d
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/f56e1b0d
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/f56e1b0d

Branch: refs/heads/usergrid-1318-queue
Commit: f56e1b0d18cc8bb224dcede58d8171df8f0c0858
Parents: 2afaa92
Author: Dave Johnson <sn...@apache.org>
Authored: Wed Oct 12 09:38:05 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Wed Oct 12 09:38:05 2016 -0400

----------------------------------------------------------------------
 .../impl/MessageCounterSerializationTest.java   | 50 +++++++++++++++++++
 .../sharding/ShardCounterSerializationTest.java | 51 ++++++++++++++++++++
 2 files changed, 101 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/f56e1b0d/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationTest.java
index a4ea0f1..1f18db7 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationTest.java
@@ -28,6 +28,11 @@ import org.apache.usergrid.persistence.qakka.serialization.queuemessages.Message
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
 import static org.junit.Assert.fail;
 
 
@@ -87,4 +92,49 @@ public class MessageCounterSerializationTest extends AbstractTest {
         Assert.assertEquals( 70, mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT ) );
     }
 
+
+    @Test
+    public void testConcurrentOperation() {
+
+        // create multiple threads, each will increment and decrement counter by same number
+
+        Injector injector = getInjector();
+        MessageCounterSerialization mcs = injector.getInstance( MessageCounterSerialization.class );
+        String queueName = "mtco_queue_" + RandomStringUtils.randomAlphanumeric( 10 );
+
+        int poolSize = 20;
+        int numThreads = 20;
+        int numCounts = 3000;
+        ExecutorService execService = Executors.newFixedThreadPool( poolSize );
+
+        for (int i = 0; i < numThreads; i++) {
+
+            execService.submit( () -> {
+
+                for ( int j = 0; j < numCounts; j++ ) {
+                    mcs.incrementCounter( queueName, DatabaseQueueMessage.Type.DEFAULT, 1 );
+                }
+
+                for ( int k = 0; k < numCounts; k++ ) {
+                    mcs.decrementCounter( queueName, DatabaseQueueMessage.Type.DEFAULT, 1 );
+                }
+            });
+        }
+
+        execService.shutdown();
+
+        try {
+            while (!execService.awaitTermination( 3, TimeUnit.SECONDS )) {
+                System.out.println( "Waiting... " +
+                    mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT )  );
+            }
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+
+        // at end counter should be zero
+
+        Assert.assertEquals( 0, mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT ) );
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f56e1b0d/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java
index 8dc16bb..bc80943 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java
@@ -19,14 +19,21 @@
 
 package org.apache.usergrid.persistence.qakka.serialization.sharding;
 
+import com.google.inject.Injector;
 import org.apache.commons.lang.RandomStringUtils;
 import org.apache.usergrid.persistence.qakka.core.CassandraClient;
 import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
 import org.apache.usergrid.persistence.qakka.AbstractTest;
 import org.apache.usergrid.persistence.qakka.exceptions.NotFoundException;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
 import static org.junit.Assert.fail;
 
 
@@ -58,4 +65,48 @@ public class ShardCounterSerializationTest extends AbstractTest {
         Assert.assertEquals( 210, scs.getCounterValue( queueName, Shard.Type.DEFAULT, shardId ) );
     }
 
+
+    @Test
+    public void testConcurrentOperation() {
+
+        // create multiple threads, each will increment counter by some number
+
+        Injector injector = getInjector();
+        ShardCounterSerialization scs = injector.getInstance( ShardCounterSerialization.class );
+        String queueName = "stco_queue_" + RandomStringUtils.randomAlphanumeric( 10 );
+        long shardId = 100L;
+
+        int poolSize = 20;
+        int numThreads = 20;
+        int numCounts = 3000;
+        ExecutorService execService = Executors.newFixedThreadPool( poolSize );
+
+        for (int i = 0; i < numThreads; i++) {
+
+            execService.submit( () -> {
+
+                for ( int j = 0; j < numCounts; j++ ) {
+                    scs.incrementCounter( queueName, Shard.Type.DEFAULT, shardId, 1 );
+                }
+
+            });
+        }
+
+        execService.shutdown();
+
+        try {
+            while (!execService.awaitTermination( 3, TimeUnit.SECONDS )) {
+                System.out.println( "Waiting... " +
+                    scs.getCounterValue( queueName, Shard.Type.DEFAULT, shardId )  );
+            }
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+
+        // test that counter is correct value
+
+        Assert.assertEquals( numThreads * numCounts,
+            scs.getCounterValue( queueName, Shard.Type.DEFAULT, shardId ) );
+    }
+
 }