You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2016/09/30 21:18:49 UTC
[04/10] usergrid git commit: Complete message counter and add support
for getQueueDepth() in QakkaQueueManager
Complete message counter and add support for getQueueDepth() in QakkaQueueManager
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/9306f12e
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/9306f12e
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/9306f12e
Branch: refs/heads/usergrid-1318-queue
Commit: 9306f12eea8cd8f0ad0b2ec4751cd8a9b9ba5382
Parents: 8b79fb8
Author: Dave Johnson <sn...@apache.org>
Authored: Wed Sep 21 08:34:28 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Wed Sep 21 08:34:28 2016 -0400
----------------------------------------------------------------------
.../usergrid/persistence/qakka/QakkaFig.java | 9 +-
.../usergrid/persistence/qakka/QakkaModule.java | 29 ++--
.../qakka/core/QueueMessageManager.java | 4 +
.../core/impl/QueueMessageManagerImpl.java | 13 +-
.../qakka/distributed/actors/QueueActor.java | 14 +-
.../distributed/actors/QueueTimeouter.java | 8 +
.../qakka/distributed/actors/QueueWriter.java | 138 ++++++++--------
.../impl/DistributedQueueServiceImpl.java | 6 +-
.../MessageCounterSerialization.java | 4 +-
.../impl/MessageCounterSerializationImpl.java | 159 +++++++++++++------
.../queue/impl/QakkaQueueManager.java | 4 +-
.../qakka/core/QueueMessageManagerTest.java | 2 +
.../impl/MessageCounterSerializationTest.java | 90 +++++++++++
.../sharding/ShardCounterSerializationTest.java | 3 -
.../queue/src/test/resources/qakka.properties | 1 +
15 files changed, 347 insertions(+), 137 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
index c66001d..c3f4189 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
@@ -52,7 +52,9 @@ public interface QakkaFig extends GuicyFig, Serializable {
String QUEUE_GET_TIMEOUT = "queue.get.timeout.seconds";
- String QUEUE_MAX_SHARD_COUNTER = "queue.max.inmemory.shard.counter";
+ String QUEUE_MAX_SHARD_COUNTER = "queue.max.inmemory.max.shard.counter";
+
+ String QUEUE_MAX_MESSAGE_CHANGES = "queue.max.inmemory.max.message.changes";
String QUEUE_SHARD_ALLOCATION_CHECK_FREQUENCY = "queue.shard.allocation.check.frequency.millis";
@@ -125,6 +127,11 @@ public interface QakkaFig extends GuicyFig, Serializable {
@Default("100")
long getMaxInMemoryShardCounter();
+ /** Once counter reaches this value, write it to permanent storage */
+ @Key(QUEUE_MAX_MESSAGE_CHANGES)
+ @Default("100")
+ long getMaxInMemoryMessageCounter();
+
/** How often to check whether new shard is needed for each queue */
@Key(QUEUE_SHARD_ALLOCATION_CHECK_FREQUENCY)
@Default("5000")
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaModule.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaModule.java
index d1d8d7e..e3113e1 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaModule.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaModule.java
@@ -37,7 +37,9 @@ import org.apache.usergrid.persistence.qakka.distributed.impl.QueueSenderRouterP
import org.apache.usergrid.persistence.qakka.distributed.impl.QueueWriterRouterProducer;
import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLogSerialization;
import org.apache.usergrid.persistence.qakka.serialization.auditlog.impl.AuditLogSerializationImpl;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization;
import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.impl.MessageCounterSerializationImpl;
import org.apache.usergrid.persistence.qakka.serialization.queuemessages.impl.QueueMessageSerializationImpl;
import org.apache.usergrid.persistence.qakka.serialization.queues.QueueSerialization;
import org.apache.usergrid.persistence.qakka.serialization.queues.impl.QueueSerializationImpl;
@@ -76,23 +78,24 @@ public class QakkaModule extends AbstractModule {
bind( App.class );
- bind( CassandraClient.class ).to( CassandraClientImpl.class );
- bind( MetricsService.class ).to( App.class );
+ bind( CassandraClient.class ).to( CassandraClientImpl.class );
+ bind( MetricsService.class ).to( App.class );
- bind( QueueManager.class ).to( QueueManagerImpl.class );
- bind( QueueSerialization.class ).to( QueueSerializationImpl.class );
+ bind( QueueManager.class ).to( QueueManagerImpl.class );
+ bind( QueueSerialization.class ).to( QueueSerializationImpl.class );
- bind( QueueMessageManager.class ).to( QueueMessageManagerImpl.class );
- bind( QueueMessageSerialization.class ).to( QueueMessageSerializationImpl.class );
+ bind( QueueMessageManager.class ).to( QueueMessageManagerImpl.class );
+ bind( QueueMessageSerialization.class ).to( QueueMessageSerializationImpl.class );
- bind( ShardSerialization.class ).to( ShardSerializationImpl.class );
- bind( ShardStrategy.class ).to( ShardStrategyImpl.class );
+ bind( ShardSerialization.class ).to( ShardSerializationImpl.class );
+ bind( ShardStrategy.class ).to( ShardStrategyImpl.class );
- bind( ShardCounterSerialization.class ).to( ShardCounterSerializationImpl.class );
+ bind( ShardCounterSerialization.class ).to( ShardCounterSerializationImpl.class );
+ bind( MessageCounterSerialization.class ).to( MessageCounterSerializationImpl.class );
- bind( TransferLogSerialization.class ).to( TransferLogSerializationImpl.class );
- bind( AuditLogSerialization.class ).to( AuditLogSerializationImpl.class );
- bind( DistributedQueueService.class ).to( DistributedQueueServiceImpl.class );
+ bind( TransferLogSerialization.class ).to( TransferLogSerializationImpl.class );
+ bind( AuditLogSerialization.class ).to( AuditLogSerializationImpl.class );
+ bind( DistributedQueueService.class ).to( DistributedQueueServiceImpl.class );
bind( QueueActorRouterProducer.class );
bind( QueueWriterRouterProducer.class );
@@ -110,6 +113,6 @@ public class QakkaModule extends AbstractModule {
migrationBinder.addBinding().to( Key.get( ShardCounterSerialization.class ) );
migrationBinder.addBinding().to( Key.get( ShardSerialization.class ) );
migrationBinder.addBinding().to( Key.get( TransferLogSerialization.class ) );
- //migrationBinder.addBinding().to( Key.get( MessageCounterSerialization.class ) );
+ migrationBinder.addBinding().to( Key.get( MessageCounterSerialization.class ) );
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManager.java
index 15203d8..b540fce 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManager.java
@@ -19,6 +19,8 @@
package org.apache.usergrid.persistence.qakka.core;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+
import java.nio.ByteBuffer;
import java.util.List;
import java.util.UUID;
@@ -80,4 +82,6 @@ public interface QueueMessageManager {
* Get message from messages available or messages inflight storage.
*/
QueueMessage getMessage(String queueName, UUID queueMessageId);
+
+ long getQueueDepth(String queueName);
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java
index bcd0f58..691c1a6 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java
@@ -33,6 +33,7 @@ import org.apache.usergrid.persistence.qakka.exceptions.NotFoundException;
import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessageBody;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization;
import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
import org.apache.usergrid.persistence.qakka.serialization.transferlog.TransferLogSerialization;
import org.slf4j.Logger;
@@ -58,6 +59,7 @@ public class QueueMessageManagerImpl implements QueueMessageManager {
private final DistributedQueueService distributedQueueService;
private final TransferLogSerialization transferLogSerialization;
private final URIStrategy uriStrategy;
+ private final MessageCounterSerialization messageCounterSerialization;
@Inject
@@ -67,8 +69,8 @@ public class QueueMessageManagerImpl implements QueueMessageManager {
QueueMessageSerialization queueMessageSerialization,
DistributedQueueService distributedQueueService,
TransferLogSerialization transferLogSerialization,
- URIStrategy uriStrategy
- ) {
+ URIStrategy uriStrategy,
+ MessageCounterSerialization messageCounterSerialization ) {
this.actorSystemFig = actorSystemFig;
this.queueManager = queueManager;
@@ -76,6 +78,7 @@ public class QueueMessageManagerImpl implements QueueMessageManager {
this.distributedQueueService = distributedQueueService;
this.transferLogSerialization = transferLogSerialization;
this.uriStrategy = uriStrategy;
+ this.messageCounterSerialization = messageCounterSerialization;
}
@@ -296,4 +299,10 @@ public class QueueMessageManagerImpl implements QueueMessageManager {
return queueMessage;
}
+
+ @Override
+ public long getQueueDepth(String queueName) {
+ return messageCounterSerialization.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT );
+ }
+
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
index 6fed13b..3b50711 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
@@ -32,6 +32,7 @@ import org.apache.usergrid.persistence.qakka.core.impl.InMemoryQueue;
import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
import org.apache.usergrid.persistence.qakka.distributed.messages.*;
import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.Duration;
@@ -46,11 +47,13 @@ import java.util.concurrent.TimeUnit;
public class QueueActor extends UntypedActor {
private static final Logger logger = LoggerFactory.getLogger( QueueActor.class );
- private final QakkaFig qakkaFig;
- private final InMemoryQueue inMemoryQueue;
+ private final QakkaFig qakkaFig;
+ private final InMemoryQueue inMemoryQueue;
private final QueueActorHelper queueActorHelper;
private final MetricsService metricsService;
+ private final MessageCounterSerialization messageCounterSerialization;
+
private final Map<String, Cancellable> refreshSchedulersByQueueName = new HashMap<>();
private final Map<String, Cancellable> timeoutSchedulersByQueueName = new HashMap<>();
private final Map<String, Cancellable> shardAllocationSchedulersByQueueName = new HashMap<>();
@@ -68,6 +71,8 @@ public class QueueActor extends UntypedActor {
inMemoryQueue = injector.getInstance( InMemoryQueue.class );
queueActorHelper = injector.getInstance( QueueActorHelper.class );
metricsService = injector.getInstance( MetricsService.class );
+
+ messageCounterSerialization = injector.getInstance( MessageCounterSerialization.class );
}
@Override
@@ -173,6 +178,11 @@ public class QueueActor extends UntypedActor {
}
}
+ messageCounterSerialization.decrementCounter(
+ queueGetRequest.getQueueName(),
+ DatabaseQueueMessage.Type.DEFAULT,
+ queueMessages.size());
+
getSender().tell( new QueueGetResponse(
DistributedQueueService.Status.SUCCESS, queueMessages ), getSender() );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
index fcd2161..7806d30 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
@@ -33,6 +33,7 @@ import org.apache.usergrid.persistence.qakka.distributed.messages.QueueTimeoutRe
import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
import org.apache.usergrid.persistence.qakka.serialization.MultiShardMessageIterator;
import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization;
import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardIterator;
@@ -54,6 +55,8 @@ public class QueueTimeouter extends UntypedActor {
private final QakkaFig qakkaFig;
private final CassandraClient cassandraClient;
+ private final MessageCounterSerialization messageCounterSerialization;
+
public QueueTimeouter(String queueName ) {
this.queueName = queueName;
@@ -65,6 +68,8 @@ public class QueueTimeouter extends UntypedActor {
qakkaFig = injector.getInstance( QakkaFig.class );
metricsService = injector.getInstance( MetricsService.class );
cassandraClient = injector.getInstance( CassandraClientImpl.class );
+
+ messageCounterSerialization = injector.getInstance( MessageCounterSerialization.class );
}
@@ -134,6 +139,9 @@ public class QueueTimeouter extends UntypedActor {
if (count > 0) {
logger.debug( "Timed out {} messages for queue {}", count, queueName );
+
+ messageCounterSerialization.decrementCounter(
+ queueName, DatabaseQueueMessage.Type.DEFAULT, count);
}
} finally {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
index 8657370..7166ef1 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
@@ -30,6 +30,7 @@ import org.apache.usergrid.persistence.qakka.distributed.messages.QueueWriteResp
import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLog;
import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLogSerialization;
import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization;
import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
import org.apache.usergrid.persistence.qakka.serialization.transferlog.TransferLogSerialization;
import org.slf4j.Logger;
@@ -48,6 +49,8 @@ public class QueueWriter extends UntypedActor {
private final AuditLogSerialization auditLogSerialization;
private final MetricsService metricsService;
+ private final MessageCounterSerialization messageCounterSerialization;
+
public QueueWriter() {
@@ -57,96 +60,101 @@ public class QueueWriter extends UntypedActor {
transferLogSerialization = injector.getInstance( TransferLogSerialization.class );
auditLogSerialization = injector.getInstance( AuditLogSerialization.class );
metricsService = injector.getInstance( MetricsService.class );
+
+ messageCounterSerialization = injector.getInstance( MessageCounterSerialization.class );
}
@Override
public void onReceive(Object message) {
- if (message instanceof QueueWriteRequest) {
-
- Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.SEND_TIME_WRITE ).time();
+ if (message instanceof QueueWriteRequest) {
- try {
- QueueWriteRequest qa = (QueueWriteRequest) message;
-
- UUID queueMessageId = QakkaUtils.getTimeUuid();
+ Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.SEND_TIME_WRITE ).time();
- // TODO: implement deliveryTime and expirationTime
+ try {
+ QueueWriteRequest qa = (QueueWriteRequest) message;
- DatabaseQueueMessage dbqm = null;
- long currentTime = System.currentTimeMillis();
+ UUID queueMessageId = QakkaUtils.getTimeUuid();
- try {
- dbqm = new DatabaseQueueMessage(
- qa.getMessageId(),
- DatabaseQueueMessage.Type.DEFAULT,
- qa.getQueueName(),
- qa.getDestRegion(),
- null,
- currentTime,
- currentTime,
- queueMessageId );
+ // TODO: implement deliveryTime and expirationTime
- messageSerialization.writeMessage( dbqm );
+ DatabaseQueueMessage dbqm = null;
+ long currentTime = System.currentTimeMillis();
- //logger.debug("Wrote queue message id {} to queue name {}",
- // dbqm.getQueueMessageId(), dbqm.getQueueName());
+ try {
+ dbqm = new DatabaseQueueMessage(
+ qa.getMessageId(),
+ DatabaseQueueMessage.Type.DEFAULT,
+ qa.getQueueName(),
+ qa.getDestRegion(),
+ null,
+ currentTime,
+ currentTime,
+ queueMessageId );
- } catch (Throwable t) {
- logger.debug("Error creating database queue message", t);
+ messageSerialization.writeMessage( dbqm );
- auditLogSerialization.recordAuditLog(
- AuditLog.Action.SEND,
- AuditLog.Status.ERROR,
- qa.getQueueName(),
- qa.getDestRegion(),
- qa.getMessageId(),
- dbqm.getMessageId() );
+ messageCounterSerialization.incrementCounter(
+ qa.getQueueName(), DatabaseQueueMessage.Type.DEFAULT, 1);
- getSender().tell( new QueueWriteResponse(
- QueueWriter.WriteStatus.ERROR ), getSender() );
+ //logger.debug("Wrote queue message id {} to queue name {}",
+ // dbqm.getQueueMessageId(), dbqm.getQueueName());
- return;
- }
+ } catch (Throwable t) {
+ logger.debug("Error creating database queue message", t);
auditLogSerialization.recordAuditLog(
AuditLog.Action.SEND,
- AuditLog.Status.SUCCESS,
+ AuditLog.Status.ERROR,
qa.getQueueName(),
qa.getDestRegion(),
qa.getMessageId(),
- dbqm.getQueueMessageId() );
-
- try {
- transferLogSerialization.removeTransferLog(
- qa.getQueueName(),
- qa.getSourceRegion(),
- qa.getDestRegion(),
- qa.getMessageId() );
-
- getSender().tell( new QueueWriteResponse(
- QueueWriter.WriteStatus.SUCCESS_XFERLOG_DELETED ), getSender() );
-
- } catch (Throwable e) {
- logger.debug( "Unable to delete transfer log for {} {} {} {}",
- qa.getQueueName(),
- qa.getSourceRegion(),
- qa.getDestRegion(),
- qa.getMessageId() );
- logger.debug("Error deleting transferlog", e);
-
- getSender().tell( new QueueWriteResponse(
- QueueWriter.WriteStatus.SUCCESS_XFERLOG_NOTDELETED ), getSender() );
- }
-
- } finally {
- timer.close();
+ dbqm.getMessageId() );
+
+ getSender().tell( new QueueWriteResponse(
+ QueueWriter.WriteStatus.ERROR ), getSender() );
+
+ return;
}
- } else {
- unhandled( message );
+ auditLogSerialization.recordAuditLog(
+ AuditLog.Action.SEND,
+ AuditLog.Status.SUCCESS,
+ qa.getQueueName(),
+ qa.getDestRegion(),
+ qa.getMessageId(),
+ dbqm.getQueueMessageId() );
+
+ try {
+ transferLogSerialization.removeTransferLog(
+ qa.getQueueName(),
+ qa.getSourceRegion(),
+ qa.getDestRegion(),
+ qa.getMessageId() );
+
+ getSender().tell( new QueueWriteResponse(
+ QueueWriter.WriteStatus.SUCCESS_XFERLOG_DELETED ), getSender() );
+
+ } catch (Throwable e) {
+ logger.debug( "Unable to delete transfer log for {} {} {} {}",
+ qa.getQueueName(),
+ qa.getSourceRegion(),
+ qa.getDestRegion(),
+ qa.getMessageId() );
+ logger.debug("Error deleting transferlog", e);
+
+ getSender().tell( new QueueWriteResponse(
+ QueueWriter.WriteStatus.SUCCESS_XFERLOG_NOTDELETED ), getSender() );
+ }
+
+ } finally {
+ timer.close();
}
+ } else {
+ unhandled( message );
+ }
+
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
index be20cde..3d6a808 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
@@ -34,6 +34,7 @@ import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService
import org.apache.usergrid.persistence.qakka.distributed.messages.*;
import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Await;
@@ -54,17 +55,20 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
private final ActorSystemManager actorSystemManager;
private final QueueManager queueManager;
private final QakkaFig qakkaFig;
+ private final MessageCounterSerialization messageCounterSerialization;
@Inject
public DistributedQueueServiceImpl(
ActorSystemManager actorSystemManager,
QueueManager queueManager,
- QakkaFig qakkaFig ) {
+ QakkaFig qakkaFig,
+ MessageCounterSerialization messageCounterSerialization ) {
this.actorSystemManager = actorSystemManager;
this.queueManager = queueManager;
this.qakkaFig = qakkaFig;
+ this.messageCounterSerialization = messageCounterSerialization;
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/MessageCounterSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/MessageCounterSerialization.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/MessageCounterSerialization.java
index cbbf11f..6c81863 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/MessageCounterSerialization.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/MessageCounterSerialization.java
@@ -21,11 +21,11 @@ package org.apache.usergrid.persistence.qakka.serialization.queuemessages;
import org.apache.usergrid.persistence.core.migration.schema.Migration;
-public interface MessageCounterSerialization extends Migration {
+public interface MessageCounterSerialization extends Migration {
void incrementCounter(String queueName, DatabaseQueueMessage.Type type, long increment);
- void decrementCounter(String queueName, DatabaseQueueMessage.Type type, long increment);
+ void decrementCounter(String queueName, DatabaseQueueMessage.Type type, long decrement);
long getCounterValue(String name, DatabaseQueueMessage.Type type);
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
index f198d05..0fdb47e 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
@@ -32,8 +32,8 @@ import org.apache.usergrid.persistence.qakka.QakkaFig;
import org.apache.usergrid.persistence.qakka.core.CassandraClient;
import org.apache.usergrid.persistence.qakka.exceptions.NotFoundException;
import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
-import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
-import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardCounterSerialization;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,49 +43,56 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@Singleton
-public class MessageCounterSerializationImpl implements ShardCounterSerialization {
+public class MessageCounterSerializationImpl implements MessageCounterSerialization {
private static final Logger logger = LoggerFactory.getLogger( MessageCounterSerializationImpl.class );
private final CassandraClient cassandraClient;
private final CassandraConfig cassandraConfig;
- final static String TABLE_SHARD_COUNTERS = "counters";
- final static String COLUMN_QUEUE_NAME = "queue_name";
- final static String COLUMN_SHARD_ID = "shard_id";
- final static String COLUMN_COUNTER_VALUE = "counter_value";
- final static String COLUMN_SHARD_TYPE = "shard_type";
+ final static String TABLE_MESSAGE_COUNTERS = "message_counters";
+ final static String COLUMN_QUEUE_NAME = "queue_name";
+ final static String COLUMN_COUNTER_VALUE = "counter_value";
+ final static String COLUMN_MESSAGE_TYPE = "message_type";
// design note: counters based on DataStax example here:
// https://docs.datastax.com/en/cql/3.1/cql/cql_using/use_counter_t.html
static final String CQL =
- "CREATE TABLE IF NOT EXISTS shard_counters ( " +
+ "CREATE TABLE IF NOT EXISTS message_counters ( " +
"counter_value counter, " +
"queue_name varchar, " +
- "shard_type varchar, " +
- "shard_id bigint, " +
- "PRIMARY KEY (queue_name, shard_type, shard_id) " +
+ "message_type varchar, " +
+ "PRIMARY KEY (queue_name, message_type) " +
");";
- final long maxInMemoryIncrement;
+ /** number of changes since last save to database */
+ final AtomicInteger numChanges = new AtomicInteger( 0 );
+
+ final long maxChangesBeforeSave;
class InMemoryCount {
long baseCount;
final AtomicLong increment = new AtomicLong( 0L );
+ final AtomicLong decrement = new AtomicLong( 0L );
+
InMemoryCount( long baseCount ) {
this.baseCount = baseCount;
}
- public long value() {
- return baseCount + increment.get();
- }
public AtomicLong getIncrement() {
return increment;
}
+ public AtomicLong getDecrement() {
+ return decrement;
+ }
+ public long value() {
+ return baseCount + increment.get() - decrement.get();
+ }
void setBaseCount( long baseCount ) {
this.baseCount = baseCount;
}
@@ -95,64 +102,89 @@ public class MessageCounterSerializationImpl implements ShardCounterSerializatio
@Inject
- public MessageCounterSerializationImpl( CassandraConfig cassandraConfig, QakkaFig qakkaFig, CassandraClient cassandraClient ) {
+ public MessageCounterSerializationImpl(
+ CassandraConfig cassandraConfig, QakkaFig qakkaFig, CassandraClient cassandraClient ) {
+
this.cassandraConfig = cassandraConfig;
- this.maxInMemoryIncrement = qakkaFig.getMaxInMemoryShardCounter();
+ this.maxChangesBeforeSave = qakkaFig.getMaxInMemoryMessageCounter();
this.cassandraClient = cassandraClient;
}
+ private String buildKey( String queueName, DatabaseQueueMessage.Type type ) {
+ return queueName + "_" + type;
+ }
+
+
@Override
- public void incrementCounter(String queueName, Shard.Type type, long shardId, long increment ) {
+ public void incrementCounter(String queueName, DatabaseQueueMessage.Type type, long increment ) {
+
+ String key = buildKey( queueName, type );
- String key = queueName + type + shardId;
synchronized ( inMemoryCounters ) {
if ( inMemoryCounters.get( key ) == null ) {
- Long value = retrieveCounterFromStorage( queueName, type, shardId );
+ Long value = retrieveCounterFromStorage( queueName, type );
if ( value == null ) {
- incrementCounterInStorage( queueName, type, shardId, 0L );
+ incrementCounterInStorage( queueName, type, 0L );
inMemoryCounters.put( key, new InMemoryCount( 0L ));
} else {
inMemoryCounters.put( key, new InMemoryCount( value ));
}
- inMemoryCounters.get( key ).getIncrement().addAndGet( increment );
- return;
}
}
InMemoryCount inMemoryCount = inMemoryCounters.get( key );
+ inMemoryCount.getIncrement().addAndGet( increment );
- synchronized ( inMemoryCount ) {
- long totalIncrement = inMemoryCount.getIncrement().addAndGet( increment );
+ saveIfNeeded( queueName, type );
+ }
- if (totalIncrement > maxInMemoryIncrement) {
- incrementCounterInStorage( queueName, type, shardId, totalIncrement );
- inMemoryCount.setBaseCount( retrieveCounterFromStorage( queueName, type, shardId ) );
- inMemoryCount.getIncrement().set( 0L );
+
+ @Override
+ public void decrementCounter(String queueName, DatabaseQueueMessage.Type type, long decrement) {
+
+ String key = buildKey( queueName, type );
+
+ synchronized ( inMemoryCounters ) {
+
+ if ( inMemoryCounters.get( key ) == null ) {
+
+ Long value = retrieveCounterFromStorage( queueName, type );
+
+ if ( value == null ) {
+ decrementCounterInStorage( queueName, type, 0L );
+ inMemoryCounters.put( key, new InMemoryCount( 0L ));
+ } else {
+ inMemoryCounters.put( key, new InMemoryCount( value ));
+ }
}
}
+ InMemoryCount inMemoryCount = inMemoryCounters.get( key );
+ inMemoryCount.getDecrement().addAndGet( decrement );
+
+ saveIfNeeded( queueName, type );
}
@Override
- public long getCounterValue( String queueName, Shard.Type type, long shardId ) {
+ public long getCounterValue( String queueName, DatabaseQueueMessage.Type type ) {
- String key = queueName + type + shardId;
+ String key = buildKey( queueName, type );
synchronized ( inMemoryCounters ) {
if ( inMemoryCounters.get( key ) == null ) {
- Long value = retrieveCounterFromStorage( queueName, type, shardId );
+ Long value = retrieveCounterFromStorage( queueName, type );
if ( value == null ) {
throw new NotFoundException(
- MessageFormat.format( "No counter found for queue {0} type {1} shardId {2}",
- queueName, type, shardId ));
+ MessageFormat.format( "No counter found for queue {0} type {1}",
+ queueName, type ));
} else {
inMemoryCounters.put( key, new InMemoryCount( value ));
}
@@ -162,30 +194,39 @@ public class MessageCounterSerializationImpl implements ShardCounterSerializatio
return inMemoryCounters.get( key ).value();
}
- void incrementCounterInStorage( String queueName, Shard.Type type, long shardId, long increment ) {
- Statement update = QueryBuilder.update( TABLE_SHARD_COUNTERS )
+ void incrementCounterInStorage( String queueName, DatabaseQueueMessage.Type type, long increment ) {
+
+ Statement update = QueryBuilder.update( TABLE_MESSAGE_COUNTERS )
.where( QueryBuilder.eq( COLUMN_QUEUE_NAME, queueName ) )
- .and( QueryBuilder.eq( COLUMN_SHARD_TYPE, type.toString() ) )
- .and( QueryBuilder.eq( COLUMN_SHARD_ID, shardId ) )
+ .and( QueryBuilder.eq( COLUMN_MESSAGE_TYPE, type.toString() ) )
.with( QueryBuilder.incr( COLUMN_COUNTER_VALUE, increment ) );
cassandraClient.getQueueMessageSession().execute( update );
}
- Long retrieveCounterFromStorage( String queueName, Shard.Type type, long shardId ) {
+ void decrementCounterInStorage( String queueName, DatabaseQueueMessage.Type type, long decrement ) {
- Statement query = QueryBuilder.select().from( TABLE_SHARD_COUNTERS )
+ Statement update = QueryBuilder.update( TABLE_MESSAGE_COUNTERS )
+ .where( QueryBuilder.eq( COLUMN_QUEUE_NAME, queueName ) )
+ .and( QueryBuilder.eq( COLUMN_MESSAGE_TYPE, type.toString() ) )
+ .with( QueryBuilder.decr( COLUMN_COUNTER_VALUE, decrement ) );
+ cassandraClient.getQueueMessageSession().execute( update );
+ }
+
+
+ Long retrieveCounterFromStorage( String queueName, DatabaseQueueMessage.Type type ) {
+
+ Statement query = QueryBuilder.select().from( TABLE_MESSAGE_COUNTERS )
.where( QueryBuilder.eq( COLUMN_QUEUE_NAME, queueName ) )
- .and( QueryBuilder.eq( COLUMN_SHARD_TYPE, type.toString()) )
- .and( QueryBuilder.eq( COLUMN_SHARD_ID, shardId ) );
+ .and( QueryBuilder.eq( COLUMN_MESSAGE_TYPE, type.toString()) );
ResultSet resultSet = cassandraClient.getQueueMessageSession().execute( query );
List<Row> all = resultSet.all();
if ( all.size() > 1 ) {
throw new QakkaRuntimeException(
- "Multiple rows for counter " + queueName + " type " + type + " shardId " + shardId );
+ "Multiple rows for counter " + queueName + " type " + type );
}
if ( all.isEmpty() ) {
return null;
@@ -194,6 +235,32 @@ public class MessageCounterSerializationImpl implements ShardCounterSerializatio
}
+ private void saveIfNeeded( String queueName, DatabaseQueueMessage.Type type ) {
+
+ String key = buildKey( queueName, type );
+
+ InMemoryCount inMemoryCount = inMemoryCounters.get( key );
+
+ synchronized ( inMemoryCount ) {
+
+ if ( numChanges.incrementAndGet() > maxChangesBeforeSave ) {
+
+ long totalIncrement = inMemoryCount.getIncrement().get();
+ incrementCounterInStorage( queueName, type, totalIncrement );
+
+ long totalDecrement = inMemoryCount.getDecrement().get();
+ decrementCounterInStorage( queueName, type, totalDecrement );
+
+ inMemoryCount.setBaseCount( retrieveCounterFromStorage( queueName, type ) );
+ inMemoryCount.getIncrement().set( 0L );
+ inMemoryCount.getDecrement().set( 0L );
+
+ numChanges.set( 0 );
+ }
+ }
+ }
+
+
@Override
public Collection<MultiTenantColumnFamilyDefinition> getColumnFamilies() {
return Collections.EMPTY_LIST;
@@ -201,8 +268,8 @@ public class MessageCounterSerializationImpl implements ShardCounterSerializatio
@Override
public Collection<TableDefinition> getTables() {
- return Collections.singletonList(
- new TableDefinitionStringImpl( cassandraConfig.getApplicationLocalKeyspace(), TABLE_SHARD_COUNTERS, CQL ) );
+ return Collections.singletonList( new TableDefinitionStringImpl(
+ cassandraConfig.getApplicationLocalKeyspace(), TABLE_MESSAGE_COUNTERS, CQL ) );
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
index 0eb609d..f3cae86 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
@@ -24,6 +24,7 @@ import com.google.inject.assistedinject.Assisted;
import org.apache.usergrid.persistence.qakka.QakkaFig;
import org.apache.usergrid.persistence.qakka.core.*;
import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization;
import org.apache.usergrid.persistence.queue.LegacyQueueFig;
import org.apache.usergrid.persistence.queue.LegacyQueueManager;
import org.apache.usergrid.persistence.queue.LegacyQueueMessage;
@@ -139,10 +140,9 @@ public class QakkaQueueManager implements LegacyQueueManager {
return messages;
}
-
@Override
public long getQueueDepth() {
- return 0;
+ return queueMessageManager.getQueueDepth( scope.getName() );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
index 0413f81..3225a66 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
@@ -177,6 +177,8 @@ public class QueueMessageManagerTest extends AbstractTest {
}
}
+ Assert.assertEquals( numMessages, qmm.getQueueDepth( queueName ) );
+
// get all messages from queue
List<QueueMessage> messages = qmm.getNextMessages( queueName, numMessages );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationTest.java
new file mode 100644
index 0000000..a4ea0f1
--- /dev/null
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.serialization.queuemessages.impl;
+
+import com.google.inject.Injector;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.usergrid.persistence.qakka.AbstractTest;
+import org.apache.usergrid.persistence.qakka.exceptions.NotFoundException;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.junit.Assert.fail;
+
+
+/**
+ * Created by Dave Johnson (snoopdave@apache.org) on 9/20/16.
+ */
+public class MessageCounterSerializationTest extends AbstractTest {
+
+ @Test
+ public void testBasicOperation() {
+
+ Injector injector = getInjector();
+ MessageCounterSerialization mcs = injector.getInstance( MessageCounterSerialization.class );
+
+ String queueName = "mcst_queue_" + RandomStringUtils.randomAlphanumeric( 20 );
+
+ try {
+ mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT );
+ fail("Should have throw NotFoundException");
+ } catch ( NotFoundException expected ) {
+ // pass
+ }
+
+ for ( int i=0; i<10; i++ ) {
+ mcs.incrementCounter( queueName, DatabaseQueueMessage.Type.DEFAULT, 1 );
+ Assert.assertEquals( i+1, mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT ) );
+ }
+
+ mcs.decrementCounter( queueName, DatabaseQueueMessage.Type.DEFAULT, 10 );
+ Assert.assertEquals( 0, mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT ) );
+
+ mcs.incrementCounter( queueName, DatabaseQueueMessage.Type.DEFAULT, 10 );
+ Assert.assertEquals( 10, mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT ) );
+
+ mcs.incrementCounter( queueName, DatabaseQueueMessage.Type.DEFAULT, 10 );
+ Assert.assertEquals( 20, mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT ) );
+
+ mcs.incrementCounter( queueName, DatabaseQueueMessage.Type.DEFAULT, 10 );
+ Assert.assertEquals( 30, mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT ) );
+
+ mcs.incrementCounter( queueName, DatabaseQueueMessage.Type.DEFAULT, 10 );
+ Assert.assertEquals( 40, mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT ) );
+
+ mcs.incrementCounter( queueName, DatabaseQueueMessage.Type.DEFAULT, 10 );
+ Assert.assertEquals( 50, mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT ) );
+
+ mcs.incrementCounter( queueName, DatabaseQueueMessage.Type.DEFAULT, 50 );
+ Assert.assertEquals( 100, mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT ) );
+
+ mcs.decrementCounter( queueName, DatabaseQueueMessage.Type.DEFAULT, 10 );
+ Assert.assertEquals( 90, mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT ) );
+
+ mcs.decrementCounter( queueName, DatabaseQueueMessage.Type.DEFAULT, 10 );
+ Assert.assertEquals( 80, mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT ) );
+
+ mcs.decrementCounter( queueName, DatabaseQueueMessage.Type.DEFAULT, 10 );
+ Assert.assertEquals( 70, mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT ) );
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java
index f9c2951..8dc16bb 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java
@@ -36,9 +36,6 @@ public class ShardCounterSerializationTest extends AbstractTest {
@Test
public void testBasicOperation() throws Exception {
- CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
-
-
ShardCounterSerialization scs = getInjector().getInstance( ShardCounterSerialization.class );
String queueName = "scst_queue_" + RandomStringUtils.randomAlphanumeric( 20 );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/test/resources/qakka.properties
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/resources/qakka.properties b/stack/corepersistence/queue/src/test/resources/qakka.properties
index aacc187..fb46f3d 100644
--- a/stack/corepersistence/queue/src/test/resources/qakka.properties
+++ b/stack/corepersistence/queue/src/test/resources/qakka.properties
@@ -44,6 +44,7 @@ queue.shard.allocation.check.frequency.millis=100
queue.shard.allocation.advance.time.millis=200
queue.max.inmemory.shard.counter = 100
+queue.max.inmemory.max.message.changes=3
queue.long.polling.time.millis=2000