You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2016/09/30 21:18:55 UTC
[10/10] usergrid git commit: Change to use proper Guice injection
instead of static injector kludge.
Change to use proper Guice injection instead of static injector kludge.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/5a19ba9a
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/5a19ba9a
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/5a19ba9a
Branch: refs/heads/usergrid-1318-queue
Commit: 5a19ba9a748c5d96f5356d77df1e7845aa92fc0f
Parents: 2cd8ecb
Author: Dave Johnson <sn...@apache.org>
Authored: Fri Sep 30 17:18:24 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Fri Sep 30 17:18:24 2016 -0400
----------------------------------------------------------------------
build.log | 15 ++
.../apache/usergrid/persistence/qakka/App.java | 17 +-
.../qakka/core/impl/InMemoryQueue.java | 11 +-
.../core/impl/QueueMessageManagerImpl.java | 18 +-
.../distributed/DistributedQueueService.java | 2 +-
.../qakka/distributed/actors/QueueActor.java | 56 +++---
.../distributed/actors/QueueActorHelper.java | 105 ++++++++++-
.../distributed/actors/QueueActorRouter.java | 11 +-
.../distributed/actors/QueueRefresher.java | 120 +-----------
.../qakka/distributed/actors/QueueSender.java | 5 +-
.../distributed/actors/QueueSenderRouter.java | 10 +-
.../distributed/actors/QueueTimeouter.java | 26 +--
.../qakka/distributed/actors/QueueWriter.java | 6 +-
.../distributed/actors/QueueWriterRouter.java | 11 +-
.../distributed/actors/ShardAllocator.java | 22 +--
.../impl/DistributedQueueServiceImpl.java | 26 ++-
.../qakka/core/QueueMessageManagerTest.java | 178 +++++++++---------
.../distributed/QueueActorServiceTest.java | 112 ++++++-----
.../actors/QueueActorHelperTest.java | 186 +++++++++++--------
.../distributed/actors/QueueReaderTest.java | 11 +-
.../distributed/actors/QueueTimeouterTest.java | 7 +-
.../distributed/actors/ShardAllocatorTest.java | 30 +--
.../queue/src/test/resources/log4j.properties | 6 +-
23 files changed, 531 insertions(+), 460 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/build.log
----------------------------------------------------------------------
diff --git a/build.log b/build.log
new file mode 100644
index 0000000..43ffacd
--- /dev/null
+++ b/build.log
@@ -0,0 +1,15 @@
+[INFO] Scanning for projects...
+[INFO] ------------------------------------------------------------------------
+[INFO] BUILD FAILURE
+[INFO] ------------------------------------------------------------------------
+[INFO] Total time: 0.086 s
+[INFO] Finished at: 2016-09-30T07:54:28-04:00
+[INFO] Final Memory: 46M/6710M
+[INFO] ------------------------------------------------------------------------
+[ERROR] The goal you specified requires a project to execute but there is no POM in this directory (/Users/ApigeeCorporation/src/usergrid-snoopdave). Please verify you invoked Maven from the correct directory. -> [Help 1]
+[ERROR]
+[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
+[ERROR] Re-run Maven using the -X switch to enable full debug logging.
+[ERROR]
+[ERROR] For more information about the errors and possible solutions, please read the following articles:
+[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MissingProjectException
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/App.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/App.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/App.java
index abbf3da..35fdb20 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/App.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/App.java
@@ -44,9 +44,6 @@ import org.slf4j.LoggerFactory;
public class App implements MetricsService {
private static final Logger logger = LoggerFactory.getLogger( App.class );
- // TODO: can we avoid this kludge with better Akka-Guice integration?
- static public Injector INJECTOR;
-
private final ActorSystemFig actorSystemFig;
private final ActorSystemManager actorSystemManager;
private final DistributedQueueService distributedQueueService;
@@ -55,14 +52,16 @@ public class App implements MetricsService {
@Inject
public App(
- Injector injector,
QakkaFig qakkaFig,
ActorSystemFig actorSystemFig,
ActorSystemManager actorSystemManager,
DistributedQueueService distributedQueueService,
- MigrationManager migrationManager) {
+ MigrationManager migrationManager,
+ QueueActorRouterProducer queueActorRouterProducer,
+ QueueWriterRouterProducer queueWriterRouterProducer,
+ QueueSenderRouterProducer queueSenderRouterProducer
+ ) {
- this.INJECTOR = injector;
this.actorSystemFig = actorSystemFig;
this.actorSystemManager = actorSystemManager;
this.distributedQueueService = distributedQueueService;
@@ -74,9 +73,9 @@ public class App implements MetricsService {
} catch (MigrationException e) {
throw new QakkaRuntimeException( "Error running migration", e );
}
- actorSystemManager.registerRouterProducer( injector.getInstance( QueueActorRouterProducer.class ) );
- actorSystemManager.registerRouterProducer( injector.getInstance( QueueWriterRouterProducer.class ) );
- actorSystemManager.registerRouterProducer( injector.getInstance( QueueSenderRouterProducer.class ) );
+ actorSystemManager.registerRouterProducer( queueActorRouterProducer );
+ actorSystemManager.registerRouterProducer( queueWriterRouterProducer );
+ actorSystemManager.registerRouterProducer( queueSenderRouterProducer );
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
index 27de079..1f6fe6e 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
@@ -19,8 +19,10 @@
package org.apache.usergrid.persistence.qakka.core.impl;
+import com.datastax.driver.core.utils.UUIDs;
import com.google.inject.Inject;
import com.google.inject.Singleton;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
import org.apache.usergrid.persistence.qakka.QakkaFig;
import org.apache.usergrid.persistence.qakka.distributed.actors.QueueRefresher;
import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
@@ -71,7 +73,14 @@ public class InMemoryQueue {
}
public UUID getNewest( String queueName ) {
- return newestByQueueName.get( queueName );
+ UUID newest = newestByQueueName.get( queueName );
+// if ( newest == null ) {
+// // Create oldest UUID from a UNIX timestamp via DataStax utility
+// // https://docs.datastax.com/en/drivers/java/2.0/com/datastax/driver/core/utils/UUIDs.html
+// newest = UUIDs.startOf( 0L );
+// newestByQueueName.put( queueName, newest );
+// }
+ return newest;
}
public DatabaseQueueMessage poll( String queueName ) {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java
index 691c1a6..59e0ce0 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java
@@ -138,10 +138,6 @@ public class QueueMessageManagerImpl implements QueueMessageManager {
@Override
public List<QueueMessage> getNextMessages(String queueName, int count) {
- if ( queueManager.getQueueConfig( queueName ) == null ) {
- throw new NotFoundException( "Queue not found: " + queueName );
- }
-
Collection<DatabaseQueueMessage> dbMessages = distributedQueueService.getNextMessages( queueName, count );
List<QueueMessage> queueMessages = joinMessages( queueName, dbMessages );
@@ -210,15 +206,14 @@ public class QueueMessageManagerImpl implements QueueMessageManager {
@Override
public void ackMessage(String queueName, UUID queueMessageId) {
- if ( queueManager.getQueueConfig( queueName ) == null ) {
- throw new NotFoundException( "Queue not found: " + queueName );
- }
-
DistributedQueueService.Status status = distributedQueueService.ackMessage( queueName, queueMessageId );
- if ( DistributedQueueService.Status.BAD_REQUEST.equals( status )) {
+ if ( DistributedQueueService.Status.NOT_INFLIGHT.equals( status )) {
throw new BadRequestException( "Message not inflight" );
+ } else if ( DistributedQueueService.Status.BAD_REQUEST.equals( status )) {
+ throw new BadRequestException( "Bad request" );
+
} else if ( DistributedQueueService.Status.ERROR.equals( status )) {
throw new QakkaRuntimeException( "Unable to ack message due to error" );
}
@@ -228,10 +223,6 @@ public class QueueMessageManagerImpl implements QueueMessageManager {
@Override
public void requeueMessage(String queueName, UUID messageId, Long delayMs) {
- if ( queueManager.getQueueConfig( queueName ) == null ) {
- throw new NotFoundException( "Queue not found: " + queueName );
- }
-
// TODO: implement requeueMessage
throw new UnsupportedOperationException( "requeueMessage not yet implemented" );
@@ -268,7 +259,6 @@ public class QueueMessageManagerImpl implements QueueMessageManager {
// first look in INFLIGHT storage
-
DatabaseQueueMessage dbMessage = queueMessageSerialization.loadMessage(
queueName, actorSystemFig.getRegionLocal(), null,
DatabaseQueueMessage.Type.INFLIGHT, queueMessageId );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/DistributedQueueService.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/DistributedQueueService.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/DistributedQueueService.java
index b02a623..b11dcff 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/DistributedQueueService.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/DistributedQueueService.java
@@ -30,7 +30,7 @@ import java.util.UUID;
*/
public interface DistributedQueueService {
- enum Status { SUCCESS, ERROR, BAD_REQUEST };
+ enum Status { SUCCESS, ERROR, BAD_REQUEST, NOT_INFLIGHT };
void init();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
index 87342ad..5ebba3d 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
@@ -24,7 +24,9 @@ import akka.actor.Cancellable;
import akka.actor.Props;
import akka.actor.UntypedActor;
import com.codahale.metrics.Timer;
+import com.google.inject.Inject;
import com.google.inject.Injector;
+import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer;
import org.apache.usergrid.persistence.qakka.App;
import org.apache.usergrid.persistence.qakka.MetricsService;
import org.apache.usergrid.persistence.qakka.QakkaFig;
@@ -65,10 +67,13 @@ public class QueueActor extends UntypedActor {
private final AtomicLong messageCount = new AtomicLong(0);
private final Set<String> queuesSeen = new HashSet<>();
+ private final Injector injector;
- public QueueActor() {
- Injector injector = App.INJECTOR;
+ @Inject
+ public QueueActor( Injector injector ) {
+
+ this.injector = injector;
qakkaFig = injector.getInstance( QakkaFig.class );
inMemoryQueue = injector.getInstance( InMemoryQueue.class );
@@ -107,7 +112,7 @@ public class QueueActor extends UntypedActor {
getContext().dispatcher(),
getSelf());
timeoutSchedulersByQueueName.put( request.getQueueName(), scheduler );
- logger.debug("Created scheduler for queue {}", request.getQueueName() );
+ logger.debug("Created timeouter for queue {}", request.getQueueName() );
}
if ( shardAllocationSchedulersByQueueName.get( request.getQueueName() ) == null ) {
@@ -126,18 +131,20 @@ public class QueueActor extends UntypedActor {
QueueRefreshRequest request = (QueueRefreshRequest)message;
queuesSeen.add( request.getQueueName() );
-
- if ( queueReadersByQueueName.get( request.getQueueName() ) == null ) {
-
- if ( !request.isOnlyIfEmpty() || inMemoryQueue.peek( request.getQueueName()) == null ) {
- ActorRef readerRef = getContext().actorOf( Props.create(
- QueueRefresher.class, request.getQueueName()), request.getQueueName() + "_reader");
- queueReadersByQueueName.put( request.getQueueName(), readerRef );
- }
- }
-
- // hand-off to queue's reader
- queueReadersByQueueName.get( request.getQueueName() ).tell( request, self() );
+ queueActorHelper.queueRefresh( request.getQueueName() );
+
+// if ( queueReadersByQueueName.get( request.getQueueName() ) == null ) {
+//
+// if ( !request.isOnlyIfEmpty() || inMemoryQueue.peek( request.getQueueName()) == null ) {
+// ActorRef readerRef = getContext().actorOf(
+// Props.create( GuiceActorProducer.class, injector, QueueRefresher.class ),
+// request.getQueueName() + "_reader");
+// queueReadersByQueueName.put( request.getQueueName(), readerRef );
+// }
+// }
+//
+// // hand-off to queue's reader
+// queueReadersByQueueName.get( request.getQueueName() ).tell( request, self() );
} else if ( message instanceof QueueTimeoutRequest ) {
QueueTimeoutRequest request = (QueueTimeoutRequest)message;
@@ -145,8 +152,9 @@ public class QueueActor extends UntypedActor {
queuesSeen.add( request.getQueueName() );
if ( queueTimeoutersByQueueName.get( request.getQueueName() ) == null ) {
- ActorRef readerRef = getContext().actorOf( Props.create(
- QueueTimeouter.class, request.getQueueName()), request.getQueueName() + "_timeouter");
+ ActorRef readerRef = getContext().actorOf(
+ Props.create( GuiceActorProducer.class, injector, QueueTimeouter.class),
+ request.getQueueName() + "_timeouter");
queueTimeoutersByQueueName.put( request.getQueueName(), readerRef );
}
@@ -160,8 +168,9 @@ public class QueueActor extends UntypedActor {
queuesSeen.add( request.getQueueName() );
if ( shardAllocatorsByQueueName.get( request.getQueueName() ) == null ) {
- ActorRef readerRef = getContext().actorOf( Props.create(
- ShardAllocator.class, request.getQueueName()), request.getQueueName() + "_shard_allocator");
+ ActorRef readerRef = getContext().actorOf(
+ Props.create( GuiceActorProducer.class, injector, ShardAllocator.class),
+ request.getQueueName() + "_shard_allocator");
shardAllocatorsByQueueName.put( request.getQueueName(), readerRef );
}
@@ -181,15 +190,15 @@ public class QueueActor extends UntypedActor {
while (queueMessages.size() < queueGetRequest.getNumRequested()) {
- DatabaseQueueMessage queueMessage = inMemoryQueue.poll( queueGetRequest.getQueueName() );
+ DatabaseQueueMessage queueMessage = inMemoryQueue.peek( queueGetRequest.getQueueName() );
if (queueMessage != null) {
if (queueActorHelper.putInflight( queueGetRequest.getQueueName(), queueMessage )) {
queueMessages.add( queueMessage );
}
} else {
-// logger.debug("in-memory queue for {} is empty, object is: {}",
-// queueGetRequest.getQueueName(), inMemoryQueue );
+ logger.debug("in-memory queue for {} is empty, object is: {}",
+ queueGetRequest.getQueueName(), inMemoryQueue );
break;
}
}
@@ -199,6 +208,9 @@ public class QueueActor extends UntypedActor {
DatabaseQueueMessage.Type.DEFAULT,
queueMessages.size());
+ logger.debug("{} returning {} for queue {}",
+ this, queueMessages.size(), queueGetRequest.getQueueName());
+
getSender().tell( new QueueGetResponse(
DistributedQueueService.Status.SUCCESS, queueMessages ), getSender() );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
index 26db903..68250df 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
@@ -19,17 +19,28 @@
package org.apache.usergrid.persistence.qakka.distributed.actors;
+import com.codahale.metrics.Timer;
import com.google.inject.Inject;
import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
+import org.apache.usergrid.persistence.qakka.MetricsService;
+import org.apache.usergrid.persistence.qakka.QakkaFig;
+import org.apache.usergrid.persistence.qakka.core.CassandraClient;
+import org.apache.usergrid.persistence.qakka.core.impl.InMemoryQueue;
import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
+import org.apache.usergrid.persistence.qakka.serialization.MultiShardMessageIterator;
import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLog;
import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLogSerialization;
import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.text.DecimalFormat;
+import java.util.Optional;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
public class QueueActorHelper {
@@ -38,18 +49,33 @@ public class QueueActorHelper {
private final ActorSystemFig actorSystemFig;
private final QueueMessageSerialization messageSerialization;
private final AuditLogSerialization auditLogSerialization;
+ private final InMemoryQueue inMemoryQueue;
+ private final QakkaFig qakkaFig;
+ private final MetricsService metricsService;
+ private final CassandraClient cassandraClient;
+
+ private final AtomicLong runCount = new AtomicLong(0);
+ private final AtomicLong totalRead = new AtomicLong(0);
@Inject
public QueueActorHelper(
- ActorSystemFig actorSystemFig,
+ QakkaFig qakkaFig,
+ ActorSystemFig actorSystemFig,
QueueMessageSerialization messageSerialization,
- AuditLogSerialization auditLogSerialization
+ AuditLogSerialization auditLogSerialization,
+ InMemoryQueue inMemoryQueue,
+ MetricsService metricsService,
+ CassandraClient cassandraClient
) {
- this.actorSystemFig = actorSystemFig;
- this.messageSerialization = messageSerialization;
+ this.actorSystemFig = actorSystemFig;
+ this.messageSerialization = messageSerialization;
this.auditLogSerialization = auditLogSerialization;
+ this.inMemoryQueue = inMemoryQueue;
+ this.qakkaFig = qakkaFig;
+ this.metricsService = metricsService;
+ this.cassandraClient = cassandraClient;
}
@@ -78,7 +104,7 @@ public class QueueActorHelper {
queueName, queueMessageId, DatabaseQueueMessage.Type.INFLIGHT );
if ( queueMessage == null ) {
- return DistributedQueueService.Status.BAD_REQUEST;
+ return DistributedQueueService.Status.NOT_INFLIGHT;
}
boolean error = false;
@@ -164,4 +190,73 @@ public class QueueActorHelper {
return true;
}
+
+ void queueRefresh( String queueName ) {
+
+ Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.REFRESH_TIME).time();
+
+ try {
+
+ if (inMemoryQueue.size( queueName ) < qakkaFig.getQueueInMemorySize()) {
+
+ ShardIterator shardIterator = new ShardIterator(
+ cassandraClient, queueName, actorSystemFig.getRegionLocal(),
+ Shard.Type.DEFAULT, Optional.empty() );
+
+ UUID since = inMemoryQueue.getNewest( queueName );
+
+ String region = actorSystemFig.getRegionLocal();
+ MultiShardMessageIterator multiShardIterator = new MultiShardMessageIterator(
+ cassandraClient, queueName, region, DatabaseQueueMessage.Type.DEFAULT,
+ shardIterator, since);
+
+ int need = qakkaFig.getQueueInMemorySize() - inMemoryQueue.size( queueName );
+ int count = 0;
+
+ while ( multiShardIterator.hasNext() && count < need ) {
+ DatabaseQueueMessage queueMessage = multiShardIterator.next();
+ inMemoryQueue.add( queueName, queueMessage );
+ count++;
+ }
+
+ long runs = runCount.incrementAndGet();
+ long readCount = totalRead.addAndGet( count );
+
+ if ( logger.isDebugEnabled() && runs % 100 == 0 ) {
+
+ final DecimalFormat format = new DecimalFormat("##.###");
+ final long nano = 1000000000;
+ Timer t = metricsService.getMetricRegistry().timer( MetricsService.REFRESH_TIME );
+
+ logger.debug("QueueRefresher for queue '{}' stats:\n" +
+ " Num runs={}\n" +
+ " Read count={}\n" +
+ " Mean={}\n" +
+ " One min rate={}\n" +
+ " Five min rate={}\n" +
+ " Snapshot mean={}\n" +
+ " Snapshot min={}\n" +
+ " Snapshot max={}",
+ queueName,
+ t.getCount(),
+ readCount,
+ format.format( t.getMeanRate() ),
+ format.format( t.getOneMinuteRate() ),
+ format.format( t.getFiveMinuteRate() ),
+ format.format( t.getSnapshot().getMean() / nano ),
+ format.format( (double) t.getSnapshot().getMin() / nano ),
+ format.format( (double) t.getSnapshot().getMax() / nano ) );
+ }
+
+ if ( count > 0 ) {
+ logger.debug( "Added {} in-memory for queue {}, new size = {}",
+ count, queueName, inMemoryQueue.size( queueName ) );
+ }
+ }
+
+ } finally {
+ timer.close();
+ }
+
+ }
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
index 97e591c..9257a0d 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
@@ -24,6 +24,9 @@ import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.routing.ConsistentHashingRouter;
import akka.routing.FromConfig;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer;
import org.apache.usergrid.persistence.qakka.distributed.messages.*;
@@ -35,9 +38,11 @@ public class QueueActorRouter extends UntypedActor {
private final ActorRef routerRef;
- public QueueActorRouter() {
- routerRef = getContext().actorOf(
- FromConfig.getInstance().props( Props.create(QueueActor.class)), "router");
+ @Inject
+ public QueueActorRouter( Injector injector ) {
+
+ this.routerRef = getContext().actorOf( FromConfig.getInstance().props(
+ Props.create(GuiceActorProducer.class, injector, QueueActor.class)), "router");
}
@Override
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
index dbd5235..2f70088 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
@@ -20,55 +20,20 @@
package org.apache.usergrid.persistence.qakka.distributed.actors;
import akka.actor.UntypedActor;
-import com.codahale.metrics.Timer;
-import com.google.inject.Injector;
-import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
-import org.apache.usergrid.persistence.qakka.App;
-import org.apache.usergrid.persistence.qakka.MetricsService;
-import org.apache.usergrid.persistence.qakka.QakkaFig;
-import org.apache.usergrid.persistence.qakka.core.CassandraClient;
-import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
-import org.apache.usergrid.persistence.qakka.core.impl.InMemoryQueue;
+import com.google.inject.Inject;
import org.apache.usergrid.persistence.qakka.distributed.messages.QueueRefreshRequest;
-import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
-import org.apache.usergrid.persistence.qakka.serialization.MultiShardMessageIterator;
-import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
-import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
-import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
-import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.text.DecimalFormat;
-import java.util.Optional;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicLong;
-
public class QueueRefresher extends UntypedActor {
private static final Logger logger = LoggerFactory.getLogger( QueueRefresher.class );
- private final String queueName;
- private final InMemoryQueue inMemoryQueue;
- private final QakkaFig qakkaFig;
- private final ActorSystemFig actorSystemFig;
- private final MetricsService metricsService;
- private final CassandraClient cassandraClient;
-
- private final AtomicLong runCount = new AtomicLong(0);
- private final AtomicLong totalRead = new AtomicLong(0);
-
+ final QueueActorHelper helper;
- public QueueRefresher(String queueName ) {
- this.queueName = queueName;
-
- Injector injector = App.INJECTOR;
-
- inMemoryQueue = injector.getInstance( InMemoryQueue.class );
- qakkaFig = injector.getInstance( QakkaFig.class );
- actorSystemFig = injector.getInstance( ActorSystemFig.class );
- metricsService = injector.getInstance( MetricsService.class );
- cassandraClient = injector.getInstance( CassandraClientImpl.class );
+ @Inject
+ public QueueRefresher( QueueActorHelper helper ) {
+ this.helper = helper;
}
@@ -78,78 +43,9 @@ public class QueueRefresher extends UntypedActor {
if ( message instanceof QueueRefreshRequest ) {
QueueRefreshRequest request = (QueueRefreshRequest) message;
-
- //logger.debug( "running for queue {}", queueName );
-
- if (!request.getQueueName().equals( queueName )) {
- throw new QakkaRuntimeException(
- "QueueWriter for " + queueName + ": Incorrect queueName " + request.getQueueName() );
- }
-
- Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.REFRESH_TIME).time();
-
- try {
-
- if (inMemoryQueue.size( queueName ) < qakkaFig.getQueueInMemorySize()) {
-
- ShardIterator shardIterator = new ShardIterator(
- cassandraClient, queueName, actorSystemFig.getRegionLocal(),
- Shard.Type.DEFAULT, Optional.empty() );
-
- UUID since = inMemoryQueue.getNewest( queueName );
-
- String region = actorSystemFig.getRegionLocal();
- MultiShardMessageIterator multiShardIterator = new MultiShardMessageIterator(
- cassandraClient, queueName, region, DatabaseQueueMessage.Type.DEFAULT,
- shardIterator, since);
-
- int need = qakkaFig.getQueueInMemorySize() - inMemoryQueue.size( queueName );
- int count = 0;
-
- while ( multiShardIterator.hasNext() && count < need ) {
- DatabaseQueueMessage queueMessage = multiShardIterator.next();
- inMemoryQueue.add( queueName, queueMessage );
- count++;
- }
-
- long runs = runCount.incrementAndGet();
- long readCount = totalRead.addAndGet( count );
-
- if ( logger.isDebugEnabled() && runs % 100 == 0 ) {
-
- final DecimalFormat format = new DecimalFormat("##.###");
- final long nano = 1000000000;
- Timer t = metricsService.getMetricRegistry().timer(MetricsService.REFRESH_TIME );
-
- logger.debug("QueueRefresher for queue '{}' stats:\n" +
- " Num runs={}\n" +
- " Read count={}\n" +
- " Mean={}\n" +
- " One min rate={}\n" +
- " Five min rate={}\n" +
- " Snapshot mean={}\n" +
- " Snapshot min={}\n" +
- " Snapshot max={}",
- queueName,
- t.getCount(),
- readCount,
- format.format( t.getMeanRate() ),
- format.format( t.getOneMinuteRate() ),
- format.format( t.getFiveMinuteRate() ),
- format.format( t.getSnapshot().getMean() / nano ),
- format.format( (double) t.getSnapshot().getMin() / nano ),
- format.format( (double) t.getSnapshot().getMax() / nano ) );
- }
-
-// if ( count > 0 ) {
-// logger.debug( "Added {} in-memory for queue {}, new size = {}",
-// count, queueName, inMemoryQueue.size( queueName ) );
-// }
- }
-
- } finally {
- timer.close();
- }
+ logger.debug( "running for queue {}", request.getQueueName() );
+ String queueName = request.getQueueName();
+ helper.queueRefresh( queueName );
} else {
unhandled( message );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java
index 03d1216..739e1c4 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java
@@ -25,6 +25,7 @@ import akka.cluster.client.ClusterClient;
import akka.pattern.Patterns;
import akka.util.Timeout;
import com.codahale.metrics.Timer;
+import com.google.inject.Inject;
import com.google.inject.Injector;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
@@ -63,9 +64,9 @@ public class QueueSender extends UntypedActor {
private final QakkaFig qakkaFig;
private final MetricsService metricsService;
- public QueueSender() {
- Injector injector = App.INJECTOR;
+ @Inject
+ public QueueSender( Injector injector ) {
actorSystemManager = injector.getInstance( ActorSystemManager.class );
transferLogSerialization = injector.getInstance( TransferLogSerialization.class );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSenderRouter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSenderRouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSenderRouter.java
index 20603a5..92d0785 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSenderRouter.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSenderRouter.java
@@ -23,6 +23,9 @@ import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.routing.FromConfig;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer;
import org.apache.usergrid.persistence.qakka.distributed.messages.QueueSendRequest;
@@ -34,10 +37,11 @@ public class QueueSenderRouter extends UntypedActor {
private final ActorRef router;
- public QueueSenderRouter() {
+ @Inject
+ public QueueSenderRouter( Injector injector ) {
- router = getContext().actorOf(
- FromConfig.getInstance().props(Props.create(QueueSender.class )), "router");
+ this.router = getContext().actorOf( FromConfig.getInstance().props(
+ Props.create( GuiceActorProducer.class, injector, QueueSender.class )), "router");
}
@Override
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
index b47aac6..9b11277 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
@@ -21,6 +21,7 @@ package org.apache.usergrid.persistence.qakka.distributed.actors;
import akka.actor.UntypedActor;
import com.codahale.metrics.Timer;
+import com.google.inject.Inject;
import com.google.inject.Injector;
import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
import org.apache.usergrid.persistence.qakka.App;
@@ -49,8 +50,6 @@ import java.util.concurrent.atomic.AtomicLong;
public class QueueTimeouter extends UntypedActor {
private static final Logger logger = LoggerFactory.getLogger( QueueTimeouter.class );
- private final String queueName;
-
private final QueueMessageSerialization messageSerialization;
private final MetricsService metricsService;
private final ActorSystemFig actorSystemFig;
@@ -62,10 +61,8 @@ public class QueueTimeouter extends UntypedActor {
private final AtomicLong totalTimedout = new AtomicLong(0);
- public QueueTimeouter(String queueName ) {
- this.queueName = queueName;
-
- Injector injector = App.INJECTOR;
+ @Inject
+ public QueueTimeouter( Injector injector) {
messageSerialization = injector.getInstance( QueueMessageSerialization.class );
actorSystemFig = injector.getInstance( ActorSystemFig.class );
@@ -88,10 +85,7 @@ public class QueueTimeouter extends UntypedActor {
QueueTimeoutRequest request = (QueueTimeoutRequest) message;
- if (!request.getQueueName().equals( queueName )) {
- throw new QakkaRuntimeException(
- "QueueTimeouter for " + queueName + ": Incorrect queueName " + request.getQueueName() );
- }
+ String queueName = request.getQueueName();
//logger.debug("Processing timeouts for queue {} ", queueName );
@@ -171,12 +165,12 @@ public class QueueTimeouter extends UntypedActor {
format.format( (double) t.getSnapshot().getMax() / nano ) );
}
-// if (count > 0) {
-// logger.debug( "Timed out {} messages for queue {}", count, queueName );
-//
-// messageCounterSerialization.decrementCounter(
-// queueName, DatabaseQueueMessage.Type.DEFAULT, count);
-// }
+ if (count > 0) {
+ logger.debug( "Timed out {} messages for queue {}", count, queueName );
+
+ messageCounterSerialization.decrementCounter(
+ queueName, DatabaseQueueMessage.Type.DEFAULT, count);
+ }
} finally {
timer.close();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
index e54d916..273f0b2 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
@@ -21,6 +21,7 @@ package org.apache.usergrid.persistence.qakka.distributed.actors;
import akka.actor.UntypedActor;
import com.codahale.metrics.Timer;
+import com.google.inject.Inject;
import com.google.inject.Injector;
import org.apache.usergrid.persistence.qakka.App;
import org.apache.usergrid.persistence.qakka.MetricsService;
@@ -54,9 +55,8 @@ public class QueueWriter extends UntypedActor {
private final MessageCounterSerialization messageCounterSerialization;
- public QueueWriter() {
-
- Injector injector = App.INJECTOR;
+ @Inject
+ public QueueWriter( Injector injector ) {
messageSerialization = injector.getInstance( QueueMessageSerialization.class );
transferLogSerialization = injector.getInstance( TransferLogSerialization.class );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java
index 9cf06d9..f0540af 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java
@@ -23,6 +23,9 @@ import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.routing.FromConfig;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer;
import org.apache.usergrid.persistence.qakka.distributed.messages.QueueWriteRequest;
@@ -33,11 +36,11 @@ public class QueueWriterRouter extends UntypedActor {
private final ActorRef router;
+ @Inject
+ public QueueWriterRouter( Injector injector ) {
- public QueueWriterRouter() {
-
- router = getContext().actorOf(
- FromConfig.getInstance().props(Props.create(QueueWriter.class )), "router");
+ this.router = getContext().actorOf( FromConfig.getInstance().props(
+ Props.create( GuiceActorProducer.class, injector, QueueWriter.class )), "router");
}
@Override
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java
index 46e4906..65c3370 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java
@@ -23,6 +23,7 @@ package org.apache.usergrid.persistence.qakka.distributed.actors;
import akka.actor.UntypedActor;
import com.codahale.metrics.Timer;
import com.datastax.driver.core.utils.UUIDs;
+import com.google.inject.Inject;
import com.google.inject.Injector;
import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
import org.apache.usergrid.persistence.qakka.App;
@@ -49,8 +50,6 @@ import java.util.UUID;
public class ShardAllocator extends UntypedActor {
private static final Logger logger = LoggerFactory.getLogger( ShardAllocator.class );
- private final String queueName;
-
private final QakkaFig qakkaFig;
private final ActorSystemFig actorSystemFig;
private final ShardSerialization shardSerialization;
@@ -59,10 +58,8 @@ public class ShardAllocator extends UntypedActor {
private final CassandraClient cassandraClient;
- public ShardAllocator( String queueName ) {
- this.queueName = queueName;
-
- Injector injector = App.INJECTOR;
+ @Inject
+ public ShardAllocator( Injector injector ) {
this.qakkaFig = injector.getInstance( QakkaFig.class );
this.shardCounterSerialization = injector.getInstance( ShardCounterSerializationImpl.class );
@@ -70,8 +67,6 @@ public class ShardAllocator extends UntypedActor {
this.actorSystemFig = injector.getInstance( ActorSystemFig.class );
this.metricsService = injector.getInstance( MetricsService.class );
this.cassandraClient = injector.getInstance( CassandraClientImpl.class );
-
- logger.debug( "Created shard allocator for queue {}", queueName );
}
@@ -82,14 +77,9 @@ public class ShardAllocator extends UntypedActor {
ShardCheckRequest request = (ShardCheckRequest) message;
- if (!request.getQueueName().equals( queueName )) {
- throw new QakkaRuntimeException(
- "ShardAllocator for " + queueName + ": Incorrect queueName " + request.getQueueName() );
- }
-
// check both types of shard
- checkLatestShard( Shard.Type.DEFAULT );
- checkLatestShard( Shard.Type.INFLIGHT );
+ checkLatestShard( request.getQueueName(), Shard.Type.DEFAULT );
+ checkLatestShard( request.getQueueName(), Shard.Type.INFLIGHT );
} else {
unhandled( message );
@@ -97,7 +87,7 @@ public class ShardAllocator extends UntypedActor {
}
- private void checkLatestShard( Shard.Type type ) {
+ private void checkLatestShard( String queueName, Shard.Type type ) {
Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.ALLOCATE_TIME).time();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
index bcb6b79..e24bdb4 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
@@ -31,6 +31,7 @@ import org.apache.usergrid.persistence.qakka.QakkaFig;
import org.apache.usergrid.persistence.qakka.core.QueueManager;
import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
import org.apache.usergrid.persistence.qakka.distributed.messages.*;
+import org.apache.usergrid.persistence.qakka.exceptions.NotFoundException;
import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization;
@@ -134,9 +135,8 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
String queueName, String sourceRegion, String destRegion, UUID messageId,
Long deliveryTime, Long expirationTime ) {
- List<String> queueNames = queueManager.getListOfQueues();
- if ( !queueNames.contains( queueName ) ) {
- throw new QakkaRuntimeException( "Queue name: " + queueName + " does not exist");
+ if ( queueManager.getQueueConfig( queueName ) == null ) {
+ throw new NotFoundException( "Queue not found: " + queueName );
}
int maxRetries = qakkaFig.getMaxSendRetries();
@@ -213,9 +213,8 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
public Collection<DatabaseQueueMessage> getNextMessagesInternal( String queueName, int count ) {
- List<String> queueNames = queueManager.getListOfQueues();
- if ( !queueNames.contains( queueName ) ) {
- throw new QakkaRuntimeException( "Queue name: " + queueName + " does not exist");
+ if ( queueManager.getQueueConfig( queueName ) == null ) {
+ throw new NotFoundException( "Queue not found: " + queueName );
}
if ( actorSystemManager.getClientActor() == null || !actorSystemManager.isReady() ) {
@@ -280,9 +279,8 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
@Override
public Status ackMessage(String queueName, UUID queueMessageId ) {
- List<String> queueNames = queueManager.getListOfQueues();
- if ( !queueNames.contains( queueName ) ) {
- return Status.BAD_REQUEST;
+ if ( queueManager.getQueueConfig( queueName ) == null ) {
+ throw new NotFoundException( "Queue not found: " + queueName );
}
QueueAckRequest message = new QueueAckRequest( queueName, queueMessageId );
@@ -293,9 +291,8 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
@Override
public Status requeueMessage(String queueName, UUID messageId) {
- List<String> queueNames = queueManager.getListOfQueues();
- if ( !queueNames.contains( queueName ) ) {
- throw new QakkaRuntimeException( "Queue name: " + queueName + " does not exist");
+ if ( queueManager.getQueueConfig( queueName ) == null ) {
+ throw new NotFoundException( "Queue not found: " + queueName );
}
QueueAckRequest message = new QueueAckRequest( queueName, messageId );
@@ -306,9 +303,8 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
@Override
public Status clearMessages(String queueName) {
- List<String> queueNames = queueManager.getListOfQueues();
- if ( !queueNames.contains( queueName ) ) {
- throw new QakkaRuntimeException( "Queue name: " + queueName + " does not exist");
+ if ( queueManager.getQueueConfig( queueName ) == null ) {
+ throw new NotFoundException( "Queue not found: " + queueName );
}
// TODO: implement clear queue
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
index c10d1f5..8ce9822 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
@@ -70,9 +70,9 @@ public class QueueMessageManagerTest extends AbstractTest {
@Test
public void testBasicOperation() throws Exception {
- Injector injector = getInjector();
+ String queueName = "qmmt_queue_" + RandomStringUtils.randomAlphanumeric(15);
- CassandraClient cassandraClient = injector.getInstance( CassandraClientImpl.class );
+ Injector injector = getInjector();
DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class );
ActorSystemFig actorSystemFig = injector.getInstance( ActorSystemFig.class );
@@ -82,54 +82,60 @@ public class QueueMessageManagerTest extends AbstractTest {
app.start( "localhost", getNextAkkaPort(), region );
// create queue and send one message to it
- String queueName = "qmmt_queue_" + RandomStringUtils.randomAlphanumeric(15);
QueueManager queueManager = injector.getInstance( QueueManager.class );
- QueueMessageManager qmm = injector.getInstance( QueueMessageManager.class );
- queueManager.createQueue( new Queue( queueName, "test-type", region, region, 0L, 5, 10, null ));
- String jsonData = "{}";
- qmm.sendMessages( queueName, Collections.singletonList(region), null, null,
- "application/json", DataType.serializeValue( jsonData, ProtocolVersion.NEWEST_SUPPORTED) );
- distributedQueueService.refresh();
- Thread.sleep(1000);
+ try {
- // get message from the queue
- List<QueueMessage> messages = qmm.getNextMessages( queueName, 1 );
- Assert.assertEquals( 1, messages.size() );
- QueueMessage message = messages.get(0);
+ QueueMessageManager qmm = injector.getInstance( QueueMessageManager.class );
+ queueManager.createQueue( new Queue( queueName, "test-type", region, region, 0L, 5, 10, null ) );
+ String jsonData = "{}";
+ qmm.sendMessages( queueName, Collections.singletonList( region ), null, null,
+ "application/json", DataType.serializeValue( jsonData, ProtocolVersion.NEWEST_SUPPORTED ) );
- // test that queue message data is present and correct
- QueueMessageSerialization qms = injector.getInstance( QueueMessageSerialization.class );
- DatabaseQueueMessageBody data = qms.loadMessageData( message.getMessageId() );
- Assert.assertNotNull( data );
- Assert.assertEquals( "application/json", data.getContentType() );
- String jsonDataReturned = new String( data.getBlob().array(), Charset.forName("UTF-8") );
- Assert.assertEquals( jsonData, jsonDataReturned );
-
- // test that transfer log is empty for our queue
- TransferLogSerialization tlogs = injector.getInstance( TransferLogSerialization.class );
- Result<TransferLog> all = tlogs.getAllTransferLogs( null, 1000 );
- List<TransferLog> logs = all.getEntities().stream()
+ distributedQueueService.refresh();
+ Thread.sleep( 1000 );
+
+ // get message from the queue
+ List<QueueMessage> messages = qmm.getNextMessages( queueName, 1 );
+ Assert.assertEquals( 1, messages.size() );
+ QueueMessage message = messages.get( 0 );
+
+ // test that queue message data is present and correct
+ QueueMessageSerialization qms = injector.getInstance( QueueMessageSerialization.class );
+ DatabaseQueueMessageBody data = qms.loadMessageData( message.getMessageId() );
+ Assert.assertNotNull( data );
+ Assert.assertEquals( "application/json", data.getContentType() );
+ String jsonDataReturned = new String( data.getBlob().array(), Charset.forName( "UTF-8" ) );
+ Assert.assertEquals( jsonData, jsonDataReturned );
+
+ // test that transfer log is empty for our queue
+ TransferLogSerialization tlogs = injector.getInstance( TransferLogSerialization.class );
+ Result<TransferLog> all = tlogs.getAllTransferLogs( null, 1000 );
+ List<TransferLog> logs = all.getEntities().stream()
.filter( log -> log.getQueueName().equals( queueName ) ).collect( Collectors.toList() );
- Assert.assertTrue( logs.isEmpty() );
+ Assert.assertTrue( logs.isEmpty() );
- // ack the message
- qmm.ackMessage( queueName, message.getQueueMessageId() );
+ // ack the message
+ qmm.ackMessage( queueName, message.getQueueMessageId() );
- // test that message is no longer stored in non-replicated keyspace
+ // test that message is no longer stored in non-replicated keyspace
- Assert.assertNull( qms.loadMessage( queueName, region, null,
- DatabaseQueueMessage.Type.DEFAULT, message.getQueueMessageId() ));
+ Assert.assertNull( qms.loadMessage( queueName, region, null,
+ DatabaseQueueMessage.Type.DEFAULT, message.getQueueMessageId() ) );
- Assert.assertNull( qms.loadMessage( queueName, region, null,
- DatabaseQueueMessage.Type.INFLIGHT, message.getQueueMessageId() ));
+ Assert.assertNull( qms.loadMessage( queueName, region, null,
+ DatabaseQueueMessage.Type.INFLIGHT, message.getQueueMessageId() ) );
- // test that audit log entry was written
- AuditLogSerialization auditLogSerialization = injector.getInstance( AuditLogSerialization.class );
- Result<AuditLog> auditLogs = auditLogSerialization.getAuditLogs( message.getMessageId() );
- Assert.assertEquals( 3, auditLogs.getEntities().size() );
+ // test that audit log entry was written
+ AuditLogSerialization auditLogSerialization = injector.getInstance( AuditLogSerialization.class );
+ Result<AuditLog> auditLogs = auditLogSerialization.getAuditLogs( message.getMessageId() );
+ Assert.assertEquals( 3, auditLogs.getEntities().size() );
- distributedQueueService.shutdown();
+ distributedQueueService.shutdown();
+
+ } finally {
+ queueManager.deleteQueue( queueName );
+ }
}
@@ -138,8 +144,6 @@ public class QueueMessageManagerTest extends AbstractTest {
Injector injector = getInjector();
- CassandraClient cassandraClient = injector.getInstance( CassandraClientImpl.class );
-
DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class );
QakkaFig qakkaFig = injector.getInstance( QakkaFig.class );
ActorSystemFig actorSystemFig = injector.getInstance( ActorSystemFig.class );
@@ -152,74 +156,82 @@ public class QueueMessageManagerTest extends AbstractTest {
// create some number of queue messages
QueueManager queueManager = injector.getInstance( QueueManager.class );
- QueueMessageManager qmm = injector.getInstance( QueueMessageManager.class );
- String queueName = "queue_testQueueMessageTimeouts_" + RandomStringUtils.randomAlphanumeric(15);
- queueManager.createQueue( new Queue( queueName, "test-type", region, region, 0L, 5, 10, null ));
- int numMessages = 40;
+ String queueName = "queue_testQueueMessageTimeouts_" + RandomStringUtils.randomAlphanumeric( 15 );
- for ( int i=0; i<numMessages; i++ ) {
- qmm.sendMessages(
+ try {
+
+ QueueMessageManager qmm = injector.getInstance( QueueMessageManager.class );
+ queueManager.createQueue( new Queue( queueName, "test-type", region, region, 0L, 5, 10, null ) );
+
+ int numMessages = 40;
+
+ for (int i = 0; i < numMessages; i++) {
+ qmm.sendMessages(
queueName,
Collections.singletonList( region ),
null, // delay
null, // expiration
"application/json",
DataType.serializeValue( "{}", ProtocolVersion.NEWEST_SUPPORTED ) );
- }
+ }
- int maxRetries = 15;
- int retries = 0;
- while ( retries++ < maxRetries ) {
- distributedQueueService.refresh();
- if (inMemoryQueue.size( queueName ) == 40) {
- break;
+ int maxRetries = 15;
+ int retries = 0;
+ while (retries++ < maxRetries) {
+ distributedQueueService.refresh();
+ if (inMemoryQueue.size( queueName ) == 40) {
+ break;
+ }
+ Thread.sleep( 500 );
}
- Thread.sleep( 500 );
- }
- Assert.assertEquals( numMessages, qmm.getQueueDepth( queueName ) );
+ Assert.assertEquals( numMessages, qmm.getQueueDepth( queueName ) );
- // get all messages from queue
+ // get all messages from queue
- List<QueueMessage> messages = qmm.getNextMessages( queueName, numMessages );
- Assert.assertEquals( numMessages, messages.size() );
+ List<QueueMessage> messages = qmm.getNextMessages( queueName, numMessages );
+ Assert.assertEquals( numMessages, messages.size() );
- // ack half of the messages
+ // ack half of the messages
- List<QueueMessage> remove = new ArrayList<>();
+ List<QueueMessage> remove = new ArrayList<>();
- for ( int i=0; i<numMessages/2; i++ ) {
- QueueMessage queueMessage = messages.get( i );
- qmm.ackMessage( queueName, queueMessage.getQueueMessageId() );
- remove.add( queueMessage );
- }
+ for (int i = 0; i < numMessages / 2; i++) {
+ QueueMessage queueMessage = messages.get( i );
+ qmm.ackMessage( queueName, queueMessage.getQueueMessageId() );
+ remove.add( queueMessage );
+ }
- for ( QueueMessage message : remove ) {
- messages.remove( message );
- }
+ for (QueueMessage message : remove) {
+ messages.remove( message );
+ }
- // wait for twice timeout period
+ // wait for twice timeout period
- Thread.sleep( 2 * qakkaFig.getQueueTimeoutSeconds()*1000 );
+ Thread.sleep( 2 * qakkaFig.getQueueTimeoutSeconds() * 1000 );
- distributedQueueService.processTimeouts();
+ distributedQueueService.processTimeouts();
- Thread.sleep( qakkaFig.getQueueTimeoutSeconds()*1000 );
+ Thread.sleep( qakkaFig.getQueueTimeoutSeconds() * 1000 );
- // attempt to ack other half of messages
+ // attempt to ack other half of messages
- for ( QueueMessage message : messages ) {
- try {
- qmm.ackMessage( queueName, message.getQueueMessageId() );
- Assert.fail("Message should have timed out by now");
+ for (QueueMessage message : messages) {
+ try {
+ qmm.ackMessage( queueName, message.getQueueMessageId() );
+ Assert.fail( "Message should have timed out by now" );
- } catch ( QakkaRuntimeException expected ) {
- // keep on going...
+ } catch (QakkaRuntimeException expected) {
+ // keep on going...
+ }
}
- }
- distributedQueueService.shutdown();
+ distributedQueueService.shutdown();
+
+ } finally {
+ queueManager.deleteQueue( queueName );
+ }
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
index 7423424..53f9224 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
@@ -78,35 +78,42 @@ public class QueueActorServiceTest extends AbstractTest {
QueueManager queueManager = injector.getInstance( QueueManager.class );
queueManager.createQueue( new Queue( queueName, "test-type", region, region, 0L, 5, 10, null ));
- // send 1 queue message, get back one queue message
- UUID messageId = UUIDGen.getTimeUUID();
+ try {
- final String data = "my test data";
- final DatabaseQueueMessageBody messageBody = new DatabaseQueueMessageBody(
+ // send 1 queue message, get back one queue message
+ UUID messageId = UUIDGen.getTimeUUID();
+
+ final String data = "my test data";
+ final DatabaseQueueMessageBody messageBody = new DatabaseQueueMessageBody(
DataType.serializeValue( data, ProtocolVersion.NEWEST_SUPPORTED ), "text/plain" );
- serialization.writeMessageData( messageId, messageBody );
+ serialization.writeMessageData( messageId, messageBody );
- distributedQueueService.sendMessageToRegion(
- queueName, region, region, messageId, null, null);
+ distributedQueueService.sendMessageToRegion(
+ queueName, region, region, messageId, null, null );
- distributedQueueService.refresh();
- Thread.sleep(1000);
+ distributedQueueService.refresh();
+ Thread.sleep( 1000 );
- Collection<DatabaseQueueMessage> qmReturned = distributedQueueService.getNextMessages( queueName, 1 );
- Assert.assertEquals( 1, qmReturned.size() );
+ Collection<DatabaseQueueMessage> qmReturned = distributedQueueService.getNextMessages( queueName, 1 );
+ Assert.assertEquals( 1, qmReturned.size() );
- DatabaseQueueMessage dqm = qmReturned.iterator().next();
- DatabaseQueueMessageBody dqmb = serialization.loadMessageData( dqm.getMessageId() );
- ByteBuffer blob = dqmb.getBlob();
+ DatabaseQueueMessage dqm = qmReturned.iterator().next();
+ DatabaseQueueMessageBody dqmb = serialization.loadMessageData( dqm.getMessageId() );
+ ByteBuffer blob = dqmb.getBlob();
- String returnedData = new String( blob.array(), "UTF-8");
+ String returnedData = new String( blob.array(), "UTF-8" );
// ByteArrayInputStream bais = new ByteArrayInputStream( blob.array() );
// ObjectInputStream ios = new ObjectInputStream( bais );
// String returnedData = (String)ios.readObject();
- Assert.assertEquals( data, returnedData );
+ Assert.assertEquals( data, returnedData );
+
+ distributedQueueService.shutdown();
+
+ } finally {
+ queueManager.deleteQueue( queueName );
+ }
- distributedQueueService.shutdown();
}
@@ -128,51 +135,58 @@ public class QueueActorServiceTest extends AbstractTest {
String queueName = "queue_testGetMultipleQueueMessages_" + UUID.randomUUID();
QueueManager queueManager = injector.getInstance( QueueManager.class );
- queueManager.createQueue(
- new Queue( queueName, "test-type", region, region, 0L, 5, 10, null ));
- for ( int i=0; i<100; i++ ) {
+ try {
- UUID messageId = UUIDGen.getTimeUUID();
+ queueManager.createQueue(
+ new Queue( queueName, "test-type", region, region, 0L, 5, 10, null ) );
- final String data = "my test data";
- final DatabaseQueueMessageBody messageBody = new DatabaseQueueMessageBody(
+ for (int i = 0; i < 100; i++) {
+
+ UUID messageId = UUIDGen.getTimeUUID();
+
+ final String data = "my test data";
+ final DatabaseQueueMessageBody messageBody = new DatabaseQueueMessageBody(
DataType.serializeValue( data, ProtocolVersion.NEWEST_SUPPORTED ), "text/plain" );
- serialization.writeMessageData( messageId, messageBody );
+ serialization.writeMessageData( messageId, messageBody );
- xferLogSerialization.recordTransferLog(
- queueName, actorSystemFig.getRegionLocal(), region, messageId );
+ xferLogSerialization.recordTransferLog(
+ queueName, actorSystemFig.getRegionLocal(), region, messageId );
- distributedQueueService.sendMessageToRegion(
- queueName, region, region, messageId , null, null);
- }
+ distributedQueueService.sendMessageToRegion(
+ queueName, region, region, messageId, null, null );
+ }
- int maxRetries = 25;
- int retries = 0;
- int count = 0;
- while ( retries++ < maxRetries ) {
- distributedQueueService.refresh();
- if (inMemoryQueue.size( queueName ) == 100) {
- count = 100;
- break;
+ int maxRetries = 25;
+ int retries = 0;
+ int count = 0;
+ while (retries++ < maxRetries) {
+ distributedQueueService.refresh();
+ if (inMemoryQueue.size( queueName ) == 100) {
+ count = 100;
+ break;
+ }
+ Thread.sleep( 1000 );
}
- Thread.sleep(1000);
- }
- Assert.assertEquals( 100, count );
+ Assert.assertEquals( 100, count );
- Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() );
- Assert.assertEquals( 75, inMemoryQueue.size( queueName ) );
+ Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() );
+ Assert.assertEquals( 75, inMemoryQueue.size( queueName ) );
- Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() );
- Assert.assertEquals( 50, inMemoryQueue.size( queueName ) );
+ Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() );
+ Assert.assertEquals( 50, inMemoryQueue.size( queueName ) );
- Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() );
- Assert.assertEquals( 25, inMemoryQueue.size( queueName ) );
+ Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() );
+ Assert.assertEquals( 25, inMemoryQueue.size( queueName ) );
- Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() );
- Assert.assertEquals( 0, inMemoryQueue.size( queueName ) );
+ Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() );
+ Assert.assertEquals( 0, inMemoryQueue.size( queueName ) );
- distributedQueueService.shutdown();
+ distributedQueueService.shutdown();
+
+ } finally {
+ queueManager.deleteQueue( queueName );
+ }
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelperTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelperTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelperTest.java
index 3bf352f..791650e 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelperTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelperTest.java
@@ -53,7 +53,6 @@ public class QueueActorHelperTest extends AbstractTest {
public void loadDatabaseQueueMessage() throws Exception {
Injector injector = getInjector();
- CassandraClient cassandraClient = injector.getInstance( CassandraClientImpl.class );
injector.getInstance( App.class ); // init the INJECTOR
@@ -66,33 +65,39 @@ public class QueueActorHelperTest extends AbstractTest {
app.start( "localhost", getNextAkkaPort(), region );
String queueName = "qat_queue_" + RandomStringUtils.randomAlphanumeric( 10 );
- queueManager.createQueue( new Queue( queueName ) );
- UUID queueMessageId = QakkaUtils.getTimeUuid();
+ try {
+ queueManager.createQueue( new Queue( queueName ) );
- // write message
+ UUID queueMessageId = QakkaUtils.getTimeUuid();
- DatabaseQueueMessage message = new DatabaseQueueMessage(
- QakkaUtils.getTimeUuid(),
- DatabaseQueueMessage.Type.DEFAULT,
- queueName,
- actorSystemFig.getRegionLocal(),
- null,
- System.currentTimeMillis(),
- null,
- queueMessageId);
- qms.writeMessage( message );
+ // write message
+
+ DatabaseQueueMessage message = new DatabaseQueueMessage(
+ QakkaUtils.getTimeUuid(),
+ DatabaseQueueMessage.Type.DEFAULT,
+ queueName,
+ actorSystemFig.getRegionLocal(),
+ null,
+ System.currentTimeMillis(),
+ null,
+ queueMessageId);
+ qms.writeMessage( message );
- // load message
+ // load message
- QueueActorHelper helper = injector.getInstance( QueueActorHelper.class );
- DatabaseQueueMessage queueMessage = helper.loadDatabaseQueueMessage(
- queueName, message.getQueueMessageId(), message.getType() );
+ QueueActorHelper helper = injector.getInstance( QueueActorHelper.class );
+ DatabaseQueueMessage queueMessage = helper.loadDatabaseQueueMessage(
+ queueName, message.getQueueMessageId(), message.getType() );
- Assert.assertNotNull( queueMessage );
+ Assert.assertNotNull( queueMessage );
- DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class );
- distributedQueueService.shutdown();
+ DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class );
+ distributedQueueService.shutdown();
+
+ } finally {
+ queueManager.deleteQueue( queueName );
+ }
}
@@ -100,8 +105,6 @@ public class QueueActorHelperTest extends AbstractTest {
public void loadDatabaseQueueMessageNotFound() throws Exception {
Injector injector = getInjector();
- CassandraClient cassandraClient = injector.getInstance( CassandraClientImpl.class );
-
injector.getInstance( App.class ); // init the INJECTOR
QueueManager queueManager = injector.getInstance( QueueManager.class );
@@ -112,20 +115,27 @@ public class QueueActorHelperTest extends AbstractTest {
app.start( "localhost", getNextAkkaPort(), region );
String queueName = "qat_queue_" + RandomStringUtils.randomAlphanumeric( 10 );
+
queueManager.createQueue( new Queue( queueName ) );
- // don't write any message
+ try {
+
+ // don't write any message
- // load message
+ // load message
- QueueActorHelper helper = injector.getInstance( QueueActorHelper.class );
- DatabaseQueueMessage queueMessage = helper.loadDatabaseQueueMessage(
+ QueueActorHelper helper = injector.getInstance( QueueActorHelper.class );
+ DatabaseQueueMessage queueMessage = helper.loadDatabaseQueueMessage(
queueName, QakkaUtils.getTimeUuid(), DatabaseQueueMessage.Type.DEFAULT );
- Assert.assertNull( queueMessage );
+ Assert.assertNull( queueMessage );
- DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class );
- distributedQueueService.shutdown();
+ DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class );
+ distributedQueueService.shutdown();
+
+ } finally {
+ queueManager.deleteQueue( queueName );
+ }
}
@@ -133,8 +143,6 @@ public class QueueActorHelperTest extends AbstractTest {
public void putInflight() throws Exception {
Injector injector = getInjector();
- CassandraClient cassandraClient = injector.getInstance( CassandraClientImpl.class );
-
injector.getInstance( App.class ); // init the INJECTOR
@@ -153,7 +161,9 @@ public class QueueActorHelperTest extends AbstractTest {
String queueName = "qat_queue_" + RandomStringUtils.randomAlphanumeric( 10 );
queueManager.createQueue( new Queue( queueName ) );
- DatabaseQueueMessage message = new DatabaseQueueMessage(
+ try {
+
+ DatabaseQueueMessage message = new DatabaseQueueMessage(
QakkaUtils.getTimeUuid(),
DatabaseQueueMessage.Type.DEFAULT,
queueName,
@@ -161,42 +171,46 @@ public class QueueActorHelperTest extends AbstractTest {
null,
System.currentTimeMillis(),
null,
- queueMessageId);
- qms.writeMessage( message );
+ queueMessageId );
+ qms.writeMessage( message );
- // put message inflight
+ // put message inflight
- QueueActorHelper helper = injector.getInstance( QueueActorHelper.class );
- helper.putInflight( queueName, message );
+ QueueActorHelper helper = injector.getInstance( QueueActorHelper.class );
+ helper.putInflight( queueName, message );
- // message must be gone from messages_available table
+ // message must be gone from messages_available table
- Assert.assertNull( qms.loadMessage(
+ Assert.assertNull( qms.loadMessage(
queueName,
actorSystemFig.getRegionLocal(),
null,
DatabaseQueueMessage.Type.DEFAULT,
message.getQueueMessageId() ) );
- // message must be present in messages_inflight table
+ // message must be present in messages_inflight table
- Assert.assertNotNull( qms.loadMessage(
+ Assert.assertNotNull( qms.loadMessage(
queueName,
actorSystemFig.getRegionLocal(),
null,
DatabaseQueueMessage.Type.INFLIGHT,
message.getQueueMessageId() ) );
- // there must be an audit log record of the successful get operation
+ // there must be an audit log record of the successful get operation
+
+ AuditLogSerialization auditLogSerialization = injector.getInstance( AuditLogSerialization.class );
+ Result<AuditLog> auditLogs = auditLogSerialization.getAuditLogs( message.getMessageId() );
+ Assert.assertEquals( 1, auditLogs.getEntities().size() );
+ Assert.assertEquals( AuditLog.Status.SUCCESS, auditLogs.getEntities().get( 0 ).getStatus() );
+ Assert.assertEquals( AuditLog.Action.GET, auditLogs.getEntities().get( 0 ).getAction() );
- AuditLogSerialization auditLogSerialization = injector.getInstance( AuditLogSerialization.class );
- Result<AuditLog> auditLogs = auditLogSerialization.getAuditLogs( message.getMessageId() );
- Assert.assertEquals( 1, auditLogs.getEntities().size() );
- Assert.assertEquals( AuditLog.Status.SUCCESS, auditLogs.getEntities().get(0).getStatus() );
- Assert.assertEquals( AuditLog.Action.GET, auditLogs.getEntities().get(0).getAction() );
+ DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class );
+ distributedQueueService.shutdown();
- DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class );
- distributedQueueService.shutdown();
+ } finally {
+ queueManager.deleteQueue( queueName );
+ }
}
@@ -222,9 +236,11 @@ public class QueueActorHelperTest extends AbstractTest {
String queueName = "qat_queue_" + RandomStringUtils.randomAlphanumeric( 10 );
queueManager.createQueue( new Queue( queueName ) );
- // write message to messages_inflight table
+ try {
- DatabaseQueueMessage message = new DatabaseQueueMessage(
+ // write message to messages_inflight table
+
+ DatabaseQueueMessage message = new DatabaseQueueMessage(
QakkaUtils.getTimeUuid(),
DatabaseQueueMessage.Type.INFLIGHT,
queueName,
@@ -232,34 +248,38 @@ public class QueueActorHelperTest extends AbstractTest {
null,
System.currentTimeMillis(),
null,
- queueMessageId);
- qms.writeMessage( message );
+ queueMessageId );
+ qms.writeMessage( message );
+
+ // ack message
- // ack message
+ QueueActorHelper helper = injector.getInstance( QueueActorHelper.class );
+ helper.ackQueueMessage( queueName, message.getQueueMessageId() );
- QueueActorHelper helper = injector.getInstance( QueueActorHelper.class );
- helper.ackQueueMessage( queueName, message.getQueueMessageId() );
+ // message must be gone from messages_available table
- // message must be gone from messages_available table
+ Assert.assertNull( helper.loadDatabaseQueueMessage(
+ queueName, QakkaUtils.getTimeUuid(), DatabaseQueueMessage.Type.INFLIGHT ) );
- Assert.assertNull( helper.loadDatabaseQueueMessage(
- queueName, QakkaUtils.getTimeUuid(), DatabaseQueueMessage.Type.INFLIGHT ));
+ // message must be gone from messages_inflight table
- // message must be gone from messages_inflight table
+ Assert.assertNull( helper.loadDatabaseQueueMessage(
+ queueName, QakkaUtils.getTimeUuid(), DatabaseQueueMessage.Type.DEFAULT ) );
- Assert.assertNull( helper.loadDatabaseQueueMessage(
- queueName, QakkaUtils.getTimeUuid(), DatabaseQueueMessage.Type.DEFAULT ));
+ // there should be an audit log record of the successful ack operation
- // there should be an audit log record of the successful ack operation
+ AuditLogSerialization auditLogSerialization = injector.getInstance( AuditLogSerialization.class );
+ Result<AuditLog> auditLogs = auditLogSerialization.getAuditLogs( message.getMessageId() );
+ Assert.assertEquals( 1, auditLogs.getEntities().size() );
+ Assert.assertEquals( AuditLog.Status.SUCCESS, auditLogs.getEntities().get( 0 ).getStatus() );
+ Assert.assertEquals( AuditLog.Action.ACK, auditLogs.getEntities().get( 0 ).getAction() );
- AuditLogSerialization auditLogSerialization = injector.getInstance( AuditLogSerialization.class );
- Result<AuditLog> auditLogs = auditLogSerialization.getAuditLogs( message.getMessageId() );
- Assert.assertEquals( 1, auditLogs.getEntities().size() );
- Assert.assertEquals( AuditLog.Status.SUCCESS, auditLogs.getEntities().get(0).getStatus() );
- Assert.assertEquals( AuditLog.Action.ACK, auditLogs.getEntities().get(0).getAction() );
+ DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class );
+ distributedQueueService.shutdown();
- DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class );
- distributedQueueService.shutdown();
+ } finally {
+ queueManager.deleteQueue( queueName );
+ }
}
@@ -267,8 +287,6 @@ public class QueueActorHelperTest extends AbstractTest {
public void ackQueueMessageNotFound() throws Exception {
Injector injector = getInjector();
- CassandraClient cassandraClient = injector.getInstance( CassandraClientImpl.class );
-
injector.getInstance( App.class ); // init the INJECTOR
QueueManager queueManager = injector.getInstance( QueueManager.class );
@@ -281,17 +299,23 @@ public class QueueActorHelperTest extends AbstractTest {
String queueName = "qat_queue_" + RandomStringUtils.randomAlphanumeric( 10 );
queueManager.createQueue( new Queue( queueName ) );
- // don't write message, just make up some bogus IDs
+ try {
- UUID queueMessageId = QakkaUtils.getTimeUuid();
+ // don't write message, just make up some bogus IDs
+
+ UUID queueMessageId = QakkaUtils.getTimeUuid();
+
+ // ack message must fail
- // ack message must fail
+ QueueActorHelper helper = injector.getInstance( QueueActorHelper.class );
+ Assert.assertEquals( DistributedQueueService.Status.NOT_INFLIGHT,
+ helper.ackQueueMessage( queueName, queueMessageId ) );
- QueueActorHelper helper = injector.getInstance( QueueActorHelper.class );
- Assert.assertEquals( DistributedQueueService.Status.BAD_REQUEST,
- helper.ackQueueMessage( queueName, queueMessageId ));
+ DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class );
+ distributedQueueService.shutdown();
- DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class );
- distributedQueueService.shutdown();
+ } finally {
+ queueManager.deleteQueue( queueName );
+ }
}
}