You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2016/10/12 13:40:00 UTC
[1/2] usergrid git commit: Use CQL batch statement for timeout
operation as well.
Repository: usergrid
Updated Branches:
refs/heads/usergrid-1318-queue 775257d27 -> f56e1b0d1
Use CQL batch statement for timeout operation as well.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/2afaa920
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/2afaa920
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/2afaa920
Branch: refs/heads/usergrid-1318-queue
Commit: 2afaa9201776bbfa8d999a6532a1d5eae3b650bc
Parents: 775257d
Author: Dave Johnson <sn...@apache.org>
Authored: Tue Oct 11 15:23:02 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Tue Oct 11 15:23:02 2016 -0400
----------------------------------------------------------------------
.../distributed/actors/QueueTimeouter.java | 25 +-----
.../qakka/distributed/actors/QueueWriter.java | 2 +-
.../QueueMessageSerialization.java | 5 ++
.../impl/QueueMessageSerializationImpl.java | 81 ++++++++++++++++----
4 files changed, 75 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/2afaa920/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
index b7a95df..58afc76 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
@@ -99,30 +99,7 @@ public class QueueTimeouter extends UntypedActor {
if ((currentTime - queueMessage.getInflightAt()) > qakkaFig.getQueueTimeoutSeconds() * 1000) {
// put message back in messages_available table as new queue message with new UUID
-
- UUID newQueueMessageId = QakkaUtils.getTimeUuid();
-
- DatabaseQueueMessage newMessage = new DatabaseQueueMessage(
- queueMessage.getMessageId(),
- DatabaseQueueMessage.Type.DEFAULT,
- queueMessage.getQueueName(),
- queueMessage.getRegion(),
- null,
- queueMessage.getQueuedAt(),
- queueMessage.getInflightAt(),
- newQueueMessageId );
-
- messageSerialization.writeMessage( newMessage );
-
- // remove message from inflight table
-
- messageSerialization.deleteMessage(
- queueName,
- actorSystemFig.getRegionLocal(),
- null,
- DatabaseQueueMessage.Type.INFLIGHT,
- queueMessage.getQueueMessageId() );
-
+ messageSerialization.timeoutInflight( queueMessage );
count++;
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/2afaa920/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
index a7dbbd0..c9be47f 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
@@ -92,7 +92,7 @@ public class QueueWriter extends UntypedActor {
qa.getDestRegion(),
null,
currentTime,
- currentTime,
+ -1L,
queueMessageId );
messageSerialization.writeMessage( dbqm );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/2afaa920/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/QueueMessageSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/QueueMessageSerialization.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/QueueMessageSerialization.java
index 86c50a5..434c965 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/QueueMessageSerialization.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/QueueMessageSerialization.java
@@ -56,4 +56,9 @@ public interface QueueMessageSerialization extends Migration {
* Write message to inflight table and remove from available table
*/
void putInflight( DatabaseQueueMessage queueMessage );
+
+ /**
+ * Remove message from inflight table, write message to available table.
+ */
+ void timeoutInflight( DatabaseQueueMessage queueMessage );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/2afaa920/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
index fba2bed..708132c 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
@@ -44,6 +44,7 @@ import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardCounter
import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.concurrent.java8.FuturesConvertersImpl;
import java.util.Collection;
import java.util.Collections;
@@ -274,18 +275,14 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization
@Override
public void putInflight( DatabaseQueueMessage message ) {
- // create statement to write new queue message to inflight table
-
- Shard.Type shardType = Shard.Type.INFLIGHT;
- Shard shard = shardStrategy.selectShard(
- message.getQueueName(), message.getRegion(), shardType, message.getQueueMessageId() );
+ // create statement to write queue message to inflight table
DatabaseQueueMessage inflightMessage = new DatabaseQueueMessage(
message.getMessageId(),
DatabaseQueueMessage.Type.INFLIGHT,
message.getQueueName(),
message.getRegion(),
- shard.getShardId(),
+ null,
message.getQueuedAt(),
System.currentTimeMillis(),
message.getQueueMessageId() );
@@ -321,6 +318,54 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization
}
+ @Override
+ public void timeoutInflight( DatabaseQueueMessage message ) {
+
+ // create statement to write queue message back to available table, with new UUID
+
+ UUID newQueueMessageId = QakkaUtils.getTimeUuid();
+
+ DatabaseQueueMessage newMessage = new DatabaseQueueMessage(
+ message.getMessageId(),
+ DatabaseQueueMessage.Type.DEFAULT,
+ message.getQueueName(),
+ message.getRegion(),
+ null,
+ System.currentTimeMillis(),
+ -1L,
+ newQueueMessageId );
+
+ Statement write = createWriteMessageStatement( newMessage );
+
+ // create statement to remove message from inflight table
+
+ Statement delete = createDeleteMessageStatement(
+ message.getQueueName(),
+ message.getRegion(),
+ message.getShardId(),
+ message.getType(),
+ message.getQueueMessageId());
+
+ // execute statements as a batch
+
+ BatchStatement batchStatement = new BatchStatement();
+ batchStatement.add( write );
+ batchStatement.add( delete );
+ cassandraClient.getQueueMessageSession().execute( batchStatement );
+
+ // bump counters
+
+ shardCounterSerialization.incrementCounter(
+ message.getQueueName(), Shard.Type.DEFAULT, message.getShardId(), 1 );
+
+ messageCounterSerialization.incrementCounter(
+ message.getQueueName(), DatabaseQueueMessage.Type.DEFAULT, 1L );
+
+ messageCounterSerialization.decrementCounter(
+ message.getQueueName(), DatabaseQueueMessage.Type.INFLIGHT, 1L );
+ }
+
+
private Statement createDeleteMessageStatement( final String queueName,
final String region,
final Long shardIdOrNull,
@@ -357,20 +402,30 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization
final UUID queueMessageId = message.getQueueMessageId() == null ?
QakkaUtils.getTimeUuid() : message.getQueueMessageId();
- long queuedAt = message.getQueuedAt() == null ?
- System.currentTimeMillis() : message.getQueuedAt();
+ final long shardId;
+
+ if ( message.getShardId() != null ) {
+ shardId = message.getShardId();
- long inflightAt = message.getInflightAt() == null ?
- message.getQueuedAt() : message.getInflightAt();
+ } else if ( DatabaseQueueMessage.Type.DEFAULT.equals( message.getType() )) {
+ Shard shard = shardStrategy.selectShard(
+ message.getQueueName(), message.getRegion(), Shard.Type.DEFAULT, message.getQueueMessageId() );
+ shardId = shard.getShardId();
+
+ } else {
+ Shard shard = shardStrategy.selectShard(
+ message.getQueueName(), message.getRegion(), Shard.Type.INFLIGHT, message.getQueueMessageId() );
+ shardId = shard.getShardId();
+ }
Statement insert = QueryBuilder.insertInto(getTableName(message.getType()))
.value( COLUMN_QUEUE_NAME, message.getQueueName())
.value( COLUMN_REGION, message.getRegion())
- .value( COLUMN_SHARD_ID, message.getShardId())
+ .value( COLUMN_SHARD_ID, shardId)
.value( COLUMN_MESSAGE_ID, message.getMessageId())
.value( COLUMN_QUEUE_MESSAGE_ID, queueMessageId)
- .value( COLUMN_INFLIGHT_AT, inflightAt )
- .value( COLUMN_QUEUED_AT, queuedAt)
+ .value( COLUMN_INFLIGHT_AT, message.getInflightAt())
+ .value( COLUMN_QUEUED_AT, message.getQueuedAt())
.using( QueryBuilder.ttl( maxTtl ) );
return insert;
[2/2] usergrid git commit: Adding concurrent tests for shard and
message counters.
Posted by sn...@apache.org.
Adding concurrent tests for shard and message counters.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/f56e1b0d
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/f56e1b0d
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/f56e1b0d
Branch: refs/heads/usergrid-1318-queue
Commit: f56e1b0d18cc8bb224dcede58d8171df8f0c0858
Parents: 2afaa92
Author: Dave Johnson <sn...@apache.org>
Authored: Wed Oct 12 09:38:05 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Wed Oct 12 09:38:05 2016 -0400
----------------------------------------------------------------------
.../impl/MessageCounterSerializationTest.java | 50 +++++++++++++++++++
.../sharding/ShardCounterSerializationTest.java | 51 ++++++++++++++++++++
2 files changed, 101 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/f56e1b0d/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationTest.java
index a4ea0f1..1f18db7 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationTest.java
@@ -28,6 +28,11 @@ import org.apache.usergrid.persistence.qakka.serialization.queuemessages.Message
import org.junit.Assert;
import org.junit.Test;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
import static org.junit.Assert.fail;
@@ -87,4 +92,49 @@ public class MessageCounterSerializationTest extends AbstractTest {
Assert.assertEquals( 70, mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT ) );
}
+
+ @Test
+ public void testConcurrentOperation() {
+
+ // create multiple threads, each will increment and decrement counter by same number
+
+ Injector injector = getInjector();
+ MessageCounterSerialization mcs = injector.getInstance( MessageCounterSerialization.class );
+ String queueName = "mtco_queue_" + RandomStringUtils.randomAlphanumeric( 10 );
+
+ int poolSize = 20;
+ int numThreads = 20;
+ int numCounts = 3000;
+ ExecutorService execService = Executors.newFixedThreadPool( poolSize );
+
+ for (int i = 0; i < numThreads; i++) {
+
+ execService.submit( () -> {
+
+ for ( int j = 0; j < numCounts; j++ ) {
+ mcs.incrementCounter( queueName, DatabaseQueueMessage.Type.DEFAULT, 1 );
+ }
+
+ for ( int k = 0; k < numCounts; k++ ) {
+ mcs.decrementCounter( queueName, DatabaseQueueMessage.Type.DEFAULT, 1 );
+ }
+ });
+ }
+
+ execService.shutdown();
+
+ try {
+ while (!execService.awaitTermination( 3, TimeUnit.SECONDS )) {
+ System.out.println( "Waiting... " +
+ mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT ) );
+ }
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ // at end counter should be zero
+
+ Assert.assertEquals( 0, mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT ) );
+ }
+
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/f56e1b0d/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java
index 8dc16bb..bc80943 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java
@@ -19,14 +19,21 @@
package org.apache.usergrid.persistence.qakka.serialization.sharding;
+import com.google.inject.Injector;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.usergrid.persistence.qakka.core.CassandraClient;
import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
import org.apache.usergrid.persistence.qakka.AbstractTest;
import org.apache.usergrid.persistence.qakka.exceptions.NotFoundException;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization;
import org.junit.Assert;
import org.junit.Test;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
import static org.junit.Assert.fail;
@@ -58,4 +65,48 @@ public class ShardCounterSerializationTest extends AbstractTest {
Assert.assertEquals( 210, scs.getCounterValue( queueName, Shard.Type.DEFAULT, shardId ) );
}
+
+ @Test
+ public void testConcurrentOperation() {
+
+ // create multiple threads, each will increment counter by some number
+
+ Injector injector = getInjector();
+ ShardCounterSerialization scs = injector.getInstance( ShardCounterSerialization.class );
+ String queueName = "stco_queue_" + RandomStringUtils.randomAlphanumeric( 10 );
+ long shardId = 100L;
+
+ int poolSize = 20;
+ int numThreads = 20;
+ int numCounts = 3000;
+ ExecutorService execService = Executors.newFixedThreadPool( poolSize );
+
+ for (int i = 0; i < numThreads; i++) {
+
+ execService.submit( () -> {
+
+ for ( int j = 0; j < numCounts; j++ ) {
+ scs.incrementCounter( queueName, Shard.Type.DEFAULT, shardId, 1 );
+ }
+
+ });
+ }
+
+ execService.shutdown();
+
+ try {
+ while (!execService.awaitTermination( 3, TimeUnit.SECONDS )) {
+ System.out.println( "Waiting... " +
+ scs.getCounterValue( queueName, Shard.Type.DEFAULT, shardId ) );
+ }
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ // test that counter is correct value
+
+ Assert.assertEquals( numThreads * numCounts,
+ scs.getCounterValue( queueName, Shard.Type.DEFAULT, shardId ) );
+ }
+
}