You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2016/10/10 21:45:20 UTC
[2/6] usergrid git commit: Add total-time metrics,
handle Ack requests via WriterRouter, use C* batch-statement for Ack operation
Add total-time metrics, handle Ack requests via WriterRouter, use C* batch-statement for Ack operation
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/7be8c274
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/7be8c274
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/7be8c274
Branch: refs/heads/usergrid-1318-queue
Commit: 7be8c274e83e1680221e06c327ad04e697c61ca6
Parents: 71fe06f
Author: Dave Johnson <sn...@apache.org>
Authored: Mon Oct 10 09:37:32 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Mon Oct 10 09:37:32 2016 -0400
----------------------------------------------------------------------
.../usergrid/persistence/qakka/QakkaFig.java | 2 +-
.../persistence/qakka/api/QueueResource.java | 83 ++++------
.../qakka/distributed/actors/QueueActor.java | 40 +----
.../distributed/actors/QueueActorHelper.java | 150 +++++------------
.../distributed/actors/QueueActorRouter.java | 50 +-----
.../distributed/actors/QueueRefresher.java | 103 +++++++++++-
.../qakka/distributed/actors/QueueSender.java | 16 +-
.../distributed/actors/QueueTimeouter.java | 13 --
.../qakka/distributed/actors/QueueWriter.java | 49 ++++--
.../distributed/actors/QueueWriterRouter.java | 7 +-
.../impl/DistributedQueueServiceImpl.java | 161 +++++++++++--------
.../impl/QueueActorRouterProducer.java | 3 +-
.../impl/QueueWriterRouterProducer.java | 11 +-
.../distributed/messages/QakkaMessage.java | 6 +-
.../distributed/messages/QueueGetResponse.java | 14 +-
.../distributed/messages/QueueSendResponse.java | 9 +-
.../messages/QueueWriteResponse.java | 8 +-
.../MultiShardMessageIterator.java | 6 +
.../QueueMessageSerialization.java | 5 +
.../impl/QueueMessageSerializationImpl.java | 151 ++++++++++++-----
.../distributed/QueueActorServiceTest.java | 3 -
.../actors/QueueActorHelperTest.java | 2 +-
.../distributed/actors/QueueReaderTest.java | 10 +-
.../DatabaseQueueMessageSerializationTest.java | 22 ++-
.../queue/src/test/resources/log4j.properties | 4 +-
25 files changed, 507 insertions(+), 421 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
index 7d89187..3093c39 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
@@ -109,7 +109,7 @@ public interface QakkaFig extends GuicyFig, Serializable {
/** How long to wait for response from queue actor before timing out and trying again */
@Key(QUEUE_GET_TIMEOUT)
- @Default("4")
+ @Default("1")
int getGetTimeoutSeconds();
/** Max number of times to retry call to queue writer for queue send operation */
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/QueueResource.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/QueueResource.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/QueueResource.java
index 10dae04..b609de3 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/QueueResource.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/QueueResource.java
@@ -19,13 +19,10 @@
package org.apache.usergrid.persistence.qakka.api;
-import com.codahale.metrics.Timer;
import com.fasterxml.jackson.databind.util.ByteBufferBackedInputStream;
import com.google.common.base.Preconditions;
import com.google.common.io.ByteStreams;
-import org.apache.usergrid.persistence.qakka.MetricsService;
import org.apache.usergrid.persistence.qakka.core.*;
-import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardCounterSerialization;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,7 +43,6 @@ public class QueueResource {
private final QueueManager queueManager;
private final QueueMessageManager queueMessageManager;
- private final MetricsService metricsService;
private final URIStrategy uriStrategy;
private final Regions regions;
@@ -55,13 +51,11 @@ public class QueueResource {
public QueueResource(
QueueManager queueManager,
QueueMessageManager queueMessageManager,
- MetricsService metricsService,
URIStrategy uriStrategy,
Regions regions ) {
this.queueManager = queueManager;
this.queueMessageManager = queueMessageManager;
- this.metricsService = metricsService;
this.uriStrategy = uriStrategy;
this.regions = regions;
}
@@ -262,8 +256,6 @@ public class QueueResource {
String contentType,
ByteBuffer byteBuffer) {
- Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.SEND_TIME_TOTAL ).time();
- try {
Preconditions.checkArgument( !QakkaUtils.isNullOrEmpty( queueName ), "Queue name is required" );
@@ -285,9 +277,6 @@ public class QueueResource {
apiResponse.setCount( 1 );
return Response.ok().entity( apiResponse ).build();
- } finally {
- timer.close();
- }
}
@@ -297,38 +286,31 @@ public class QueueResource {
public Response getNextMessages( @PathParam("queueName") String queueName,
@QueryParam("count") @DefaultValue("1") String countParam) throws Exception {
- Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.GET_TIME_TOTAL ).time();
- try {
-
- Preconditions.checkArgument( !QakkaUtils.isNullOrEmpty( queueName ), "Queue name is required" );
+ Preconditions.checkArgument( !QakkaUtils.isNullOrEmpty( queueName ), "Queue name is required" );
- int count = 1;
- try {
- count = Integer.parseInt( countParam );
- } catch (Exception e) {
- throw new IllegalArgumentException( "Invalid count parameter" );
- }
- if (count <= 0) {
- // invalid count
- throw new IllegalArgumentException( "Count must be >= 1" );
- }
-
- List<QueueMessage> messages = queueMessageManager.getNextMessages( queueName, count );
+ int count = 1;
+ try {
+ count = Integer.parseInt( countParam );
+ } catch (Exception e) {
+ throw new IllegalArgumentException( "Invalid count parameter" );
+ }
+ if (count <= 0) {
+ // invalid count
+ throw new IllegalArgumentException( "Count must be >= 1" );
+ }
- ApiResponse apiResponse = new ApiResponse();
+ List<QueueMessage> messages = queueMessageManager.getNextMessages( queueName, count );
- if (messages != null && !messages.isEmpty()) {
- apiResponse.setQueueMessages( messages );
+ ApiResponse apiResponse = new ApiResponse();
- } else { // always return queueMessages field
- apiResponse.setQueueMessages( Collections.EMPTY_LIST );
- }
- apiResponse.setCount( apiResponse.getQueueMessages().size() );
- return Response.ok().entity( apiResponse ).build();
+ if (messages != null && !messages.isEmpty()) {
+ apiResponse.setQueueMessages( messages );
- } finally {
- timer.close();
+ } else { // always return queueMessages field
+ apiResponse.setQueueMessages( Collections.EMPTY_LIST );
}
+ apiResponse.setCount( apiResponse.getQueueMessages().size() );
+ return Response.ok().entity( apiResponse ).build();
}
@@ -338,25 +320,18 @@ public class QueueResource {
public Response ackMessage( @PathParam("queueName") String queueName,
@PathParam("queueMessageId") String queueMessageId) throws Exception {
- Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.ACK_TIME_TOTAL ).time();
- try {
-
- Preconditions.checkArgument( !QakkaUtils.isNullOrEmpty( queueName ), "Queue name is required" );
+ Preconditions.checkArgument( !QakkaUtils.isNullOrEmpty( queueName ), "Queue name is required" );
- UUID messageUuid;
- try {
- messageUuid = UUID.fromString( queueMessageId );
- } catch (Exception e) {
- throw new IllegalArgumentException( "Invalid queue message UUID" );
- }
- queueMessageManager.ackMessage( queueName, messageUuid );
-
- ApiResponse apiResponse = new ApiResponse();
- return Response.ok().entity( apiResponse ).build();
-
- } finally {
- timer.close();
+ UUID messageUuid;
+ try {
+ messageUuid = UUID.fromString( queueMessageId );
+ } catch (Exception e) {
+ throw new IllegalArgumentException( "Invalid queue message UUID" );
}
+ queueMessageManager.ackMessage( queueName, messageUuid );
+
+ ApiResponse apiResponse = new ApiResponse();
+ return Response.ok().entity( apiResponse ).build();
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
index 9ce38ef..248f9cd 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
@@ -64,32 +64,23 @@ public class QueueActor extends UntypedActor {
private final Set<String> queuesSeen = new HashSet<>();
- //private final Injector injector;
-
@Inject
public QueueActor(
- //Injector injector,
QakkaFig qakkaFig,
InMemoryQueue inMemoryQueue,
QueueActorHelper queueActorHelper,
MetricsService metricsService,
MessageCounterSerialization messageCounterSerialization
) {
- //this.injector = injector;
this.qakkaFig = qakkaFig;
this.inMemoryQueue = inMemoryQueue;
this.queueActorHelper = queueActorHelper;
this.metricsService = metricsService;
this.messageCounterSerialization = messageCounterSerialization;
-
-// qakkaFig = injector.getInstance( QakkaFig.class );
-// inMemoryQueue = injector.getInstance( InMemoryQueue.class );
-// queueActorHelper = injector.getInstance( QueueActorHelper.class );
-// metricsService = injector.getInstance( MetricsService.class );
-// messageCounterSerialization = injector.getInstance( MessageCounterSerialization.class );
}
+
@Override
public void onReceive(Object message) {
@@ -134,6 +125,7 @@ public class QueueActor extends UntypedActor {
logger.debug("Created shard allocater for queue {}", request.getQueueName() );
}
+
} else if ( message instanceof QueueRefreshRequest ) {
QueueRefreshRequest request = (QueueRefreshRequest)message;
queuesSeen.add( request.getQueueName() );
@@ -154,6 +146,7 @@ public class QueueActor extends UntypedActor {
// hand-off to queue's reader
queueReadersByQueueName.get( request.getQueueName() ).tell( request, self() );
+
} else if ( message instanceof QueueTimeoutRequest ) {
QueueTimeoutRequest request = (QueueTimeoutRequest)message;
@@ -169,6 +162,7 @@ public class QueueActor extends UntypedActor {
// ASYNCHRONOUS -> hand-off to queue's timeouter
queueTimeoutersByQueueName.get( request.getQueueName() ).tell( request, self() );
+
} else if ( message instanceof ShardCheckRequest ) {
ShardCheckRequest request = (ShardCheckRequest)message;
@@ -184,6 +178,7 @@ public class QueueActor extends UntypedActor {
// ASYNCHRONOUS -> hand-off to queue's shard allocator
shardAllocatorsByQueueName.get( request.getQueueName() ).tell( request, self() );
+
} else if ( message instanceof QueueGetRequest) {
QueueGetRequest queueGetRequest = (QueueGetRequest) message;
@@ -199,30 +194,7 @@ public class QueueActor extends UntypedActor {
Collection<DatabaseQueueMessage> messages = queueActorHelper.getMessages( queueName, numRequested);
getSender().tell( new QueueGetResponse(
- DistributedQueueService.Status.SUCCESS, messages ), getSender() );
-
- } finally {
- timer.close();
- }
-
-
- } else if ( message instanceof QueueAckRequest) {
-
- Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.ACK_TIME_ACK ).time();
- try {
-
- QueueAckRequest queueAckRequest = (QueueAckRequest) message;
-
- queuesSeen.add( queueAckRequest.getQueueName() );
-
- DistributedQueueService.Status status = queueActorHelper.ackQueueMessage(
- queueAckRequest.getQueueName(),
- queueAckRequest.getQueueMessageId() );
-
- getSender().tell( new QueueAckResponse(
- queueAckRequest.getQueueName(),
- queueAckRequest.getQueueMessageId(),
- status ), getSender() );
+ DistributedQueueService.Status.SUCCESS, messages, queueName ), getSender() );
} finally {
timer.close();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
index e3996c5..fcb3fba 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
@@ -41,7 +41,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Optional;
import java.util.UUID;
-import java.util.concurrent.atomic.AtomicLong;
public class QueueActorHelper {
@@ -55,9 +54,6 @@ public class QueueActorHelper {
private final MetricsService metricsService;
private final CassandraClient cassandraClient;
- private final AtomicLong runCount = new AtomicLong(0);
- private final AtomicLong totalRead = new AtomicLong(0);
-
@Inject
public QueueActorHelper(
@@ -109,7 +105,7 @@ public class QueueActorHelper {
if (queueMessage != null) {
- if (putInflight( queueName, queueMessage )) {
+ if (putInflight( queueMessage )) {
queueMessages.add( queueMessage );
}
@@ -125,10 +121,48 @@ public class QueueActorHelper {
}
+ boolean putInflight( DatabaseQueueMessage queueMessage ) {
+
+ UUID qmid = queueMessage.getQueueMessageId();
+ try {
+
+ messageSerialization.putInflight( queueMessage );
+
+ } catch ( Throwable t ) {
+ logger.error("Error putting inflight queue message "
+ + qmid + " queue name: " + queueMessage.getQueueName(), t);
+
+ auditLogSerialization.recordAuditLog(
+ AuditLog.Action.GET,
+ AuditLog.Status.ERROR,
+ queueMessage.getQueueName(),
+ actorSystemFig.getRegionLocal(),
+ queueMessage.getMessageId(),
+ qmid);
+
+ return false;
+ }
+
+ auditLogSerialization.recordAuditLog(
+ AuditLog.Action.GET,
+ AuditLog.Status.SUCCESS,
+ queueMessage.getQueueName(),
+ actorSystemFig.getRegionLocal(),
+ queueMessage.getMessageId(),
+ qmid);
+
+ return true;
+ }
+
+
DistributedQueueService.Status ackQueueMessage(String queueName, UUID queueMessageId ) {
- DatabaseQueueMessage queueMessage = loadDatabaseQueueMessage(
- queueName, queueMessageId, DatabaseQueueMessage.Type.INFLIGHT );
+ DatabaseQueueMessage queueMessage = messageSerialization.loadMessage(
+ queueName,
+ actorSystemFig.getRegionLocal(),
+ null,
+ DatabaseQueueMessage.Type.INFLIGHT,
+ queueMessageId );
if ( queueMessage == null ) {
logger.error("Queue {} queue message id {} not found in inflight table", queueName, queueMessageId);
@@ -174,106 +208,4 @@ public class QueueActorHelper {
return DistributedQueueService.Status.ERROR;
}
}
-
-
- boolean putInflight( String queueName, DatabaseQueueMessage queueMessage ) {
-
- UUID qmid = queueMessage.getQueueMessageId();
- try {
-
- DatabaseQueueMessage inflightMessage = new DatabaseQueueMessage(
- queueMessage.getMessageId(),
- DatabaseQueueMessage.Type.INFLIGHT,
- queueName,
- actorSystemFig.getRegionLocal(),
- null, // let serialization select the shard
- queueMessage.getQueuedAt(),
- System.currentTimeMillis(),
- qmid);
-
- messageSerialization.writeMessage( inflightMessage );
-
- DatabaseQueueMessage retrieved = loadDatabaseQueueMessage(
- queueName, qmid, DatabaseQueueMessage.Type.INFLIGHT );
- if ( retrieved == null ) {
- logger.error("Failed ot write queue message id {} to inflight table", qmid);
- return false;
- }
-
- messageSerialization.deleteMessage(
- queueName,
- actorSystemFig.getRegionLocal(),
- null,
- DatabaseQueueMessage.Type.DEFAULT,
- qmid);
-
- //logger.debug("Put message {} inflight for queue name {}", qmid, queueName);
-
- } catch ( Throwable t ) {
- logger.error("Error putting inflight queue message " + qmid + " queue name: " + queueName, t);
-
- auditLogSerialization.recordAuditLog(
- AuditLog.Action.GET,
- AuditLog.Status.ERROR,
- queueName,
- actorSystemFig.getRegionLocal(),
- queueMessage.getMessageId(),
- qmid);
-
- return false;
- }
-
- auditLogSerialization.recordAuditLog(
- AuditLog.Action.GET,
- AuditLog.Status.SUCCESS,
- queueName,
- actorSystemFig.getRegionLocal(),
- queueMessage.getMessageId(),
- qmid);
-
- return true;
- }
-
-
- void queueRefresh( String queueName ) {
-
- Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.REFRESH_TIME).time();
-
- try {
-
- if (inMemoryQueue.size( queueName ) < qakkaFig.getQueueInMemorySize()) {
-
- // TODO: need to track the starting shard
-
- ShardIterator shardIterator = new ShardIterator(
- cassandraClient, queueName, actorSystemFig.getRegionLocal(),
- Shard.Type.DEFAULT, Optional.empty() );
-
- UUID since = inMemoryQueue.getNewest( queueName );
-
- String region = actorSystemFig.getRegionLocal();
- MultiShardMessageIterator multiShardIterator = new MultiShardMessageIterator(
- cassandraClient, queueName, region, DatabaseQueueMessage.Type.DEFAULT,
- shardIterator, since);
-
- int need = qakkaFig.getQueueInMemorySize() - inMemoryQueue.size( queueName );
- int count = 0;
-
- while ( multiShardIterator.hasNext() && count < need ) {
- DatabaseQueueMessage queueMessage = multiShardIterator.next();
- inMemoryQueue.add( queueName, queueMessage );
- count++;
- }
-
- if ( count > 0 ) {
- logger.debug( "Added {} in-memory for queue {}, new size = {}",
- count, queueName, inMemoryQueue.size( queueName ) );
- }
- }
-
- } finally {
- timer.close();
- }
-
- }
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
index c40a3d9..f908e7f 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
@@ -27,6 +27,7 @@ import akka.routing.FromConfig;
import com.google.inject.Inject;
import com.google.inject.Injector;
import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer;
+import org.apache.usergrid.persistence.qakka.distributed.impl.QueueActorRouterProducer;
import org.apache.usergrid.persistence.qakka.distributed.messages.*;
@@ -36,10 +37,13 @@ import org.apache.usergrid.persistence.qakka.distributed.messages.*;
public class QueueActorRouter extends UntypedActor {
private final ActorRef routerRef;
+ private final QueueActorRouterProducer queueActorRouterProducer;
@Inject
- public QueueActorRouter( Injector injector ) {
+ public QueueActorRouter( QueueActorRouterProducer queueActorRouterProducer ) {
+
+ this.queueActorRouterProducer = queueActorRouterProducer;
this.routerRef = getContext().actorOf( FromConfig.getInstance().props(
Props.create(GuiceActorProducer.class, QueueActor.class)), "router");
@@ -48,51 +52,13 @@ public class QueueActorRouter extends UntypedActor {
@Override
public void onReceive(Object message) {
- // TODO: can we do something smarter than this if-then-else structure
- // e.g. if message is recognized as one of ours, then we just pass it on?
-
- if ( message instanceof QueueGetRequest) {
- QueueGetRequest qgr = (QueueGetRequest) message;
+ if ( queueActorRouterProducer.getMessageTypes().contains( message.getClass() ) ) {
+ QakkaMessage qakkaMessage = (QakkaMessage) message;
ConsistentHashingRouter.ConsistentHashableEnvelope envelope =
- new ConsistentHashingRouter.ConsistentHashableEnvelope( message, qgr.getQueueName() );
+ new ConsistentHashingRouter.ConsistentHashableEnvelope( message, qakkaMessage.getQueueName() );
routerRef.tell( envelope, getSender() );
- } else if ( message instanceof QueueAckRequest) {
- QueueAckRequest qar = (QueueAckRequest)message;
-
- ConsistentHashingRouter.ConsistentHashableEnvelope envelope =
- new ConsistentHashingRouter.ConsistentHashableEnvelope( message, qar.getQueueName() );
- routerRef.tell( envelope, getSender());
-
- } else if ( message instanceof QueueInitRequest) {
- QueueInitRequest qar = (QueueInitRequest)message;
-
- ConsistentHashingRouter.ConsistentHashableEnvelope envelope =
- new ConsistentHashingRouter.ConsistentHashableEnvelope( message, qar.getQueueName() );
- routerRef.tell( envelope, getSender());
-
- } else if ( message instanceof QueueRefreshRequest) {
- QueueRefreshRequest qar = (QueueRefreshRequest)message;
-
- ConsistentHashingRouter.ConsistentHashableEnvelope envelope =
- new ConsistentHashingRouter.ConsistentHashableEnvelope( message, qar.getQueueName() );
- routerRef.tell( envelope, getSender());
-
- } else if ( message instanceof QueueTimeoutRequest) {
- QueueTimeoutRequest qar = (QueueTimeoutRequest)message;
-
- ConsistentHashingRouter.ConsistentHashableEnvelope envelope =
- new ConsistentHashingRouter.ConsistentHashableEnvelope( message, qar.getQueueName() );
- routerRef.tell( envelope, getSender());
-
- } else if ( message instanceof ShardCheckRequest) {
- ShardCheckRequest qar = (ShardCheckRequest)message;
-
- ConsistentHashingRouter.ConsistentHashableEnvelope envelope =
- new ConsistentHashingRouter.ConsistentHashableEnvelope( message, qar.getQueueName() );
- routerRef.tell( envelope, getSender());
-
} else {
unhandled(message);
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
index ae9969c..afd5640 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
@@ -20,20 +20,52 @@
package org.apache.usergrid.persistence.qakka.distributed.actors;
import akka.actor.UntypedActor;
+import com.codahale.metrics.Timer;
import com.google.inject.Inject;
+import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
+import org.apache.usergrid.persistence.qakka.MetricsService;
+import org.apache.usergrid.persistence.qakka.QakkaFig;
+import org.apache.usergrid.persistence.qakka.core.CassandraClient;
+import org.apache.usergrid.persistence.qakka.core.impl.InMemoryQueue;
import org.apache.usergrid.persistence.qakka.distributed.messages.QueueRefreshRequest;
+import org.apache.usergrid.persistence.qakka.serialization.MultiShardMessageIterator;
+import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLogSerialization;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+
public class QueueRefresher extends UntypedActor {
private static final Logger logger = LoggerFactory.getLogger( QueueRefresher.class );
- final QueueActorHelper helper;
+ private final ActorSystemFig actorSystemFig;
+ private final InMemoryQueue inMemoryQueue;
+ private final QakkaFig qakkaFig;
+ private final MetricsService metricsService;
+ private final CassandraClient cassandraClient;
+
@Inject
- public QueueRefresher( QueueActorHelper helper ) {
- this.helper = helper;
+ public QueueRefresher(
+ ActorSystemFig actorSystemFig,
+ InMemoryQueue inMemoryQueue,
+ QakkaFig qakkaFig,
+ MetricsService metricsService,
+ CassandraClient cassandraClient
+ ) {
+ this.actorSystemFig = actorSystemFig;
+ this.inMemoryQueue = inMemoryQueue;
+ this.qakkaFig = qakkaFig;
+ this.metricsService = metricsService;
+ this.cassandraClient = cassandraClient;
}
@@ -44,10 +76,73 @@ public class QueueRefresher extends UntypedActor {
QueueRefreshRequest request = (QueueRefreshRequest) message;
String queueName = request.getQueueName();
- helper.queueRefresh( queueName );
+ queueRefresh( queueName );
} else {
unhandled( message );
}
}
+
+ Map<String, Long> startingShards = new HashMap<>();
+
+
+ void queueRefresh( String queueName ) {
+
+ Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.REFRESH_TIME).time();
+
+ try {
+
+ if (inMemoryQueue.size( queueName ) < qakkaFig.getQueueInMemorySize()) {
+
+ final Optional shardIdOptional;
+ final String shardKey =
+ createShardKey( queueName, Shard.Type.DEFAULT, actorSystemFig.getRegionLocal() );
+ Long shardId = startingShards.get( shardKey );
+
+ if ( shardId != null ) {
+ shardIdOptional = Optional.of( shardId );
+ } else {
+ shardIdOptional = Optional.empty();
+ }
+
+ ShardIterator shardIterator = new ShardIterator(
+ cassandraClient, queueName, actorSystemFig.getRegionLocal(),
+ Shard.Type.DEFAULT, Optional.empty() );
+
+ UUID since = inMemoryQueue.getNewest( queueName );
+
+ String region = actorSystemFig.getRegionLocal();
+ MultiShardMessageIterator multiShardIterator = new MultiShardMessageIterator(
+ cassandraClient, queueName, region, DatabaseQueueMessage.Type.DEFAULT,
+ shardIterator, since);
+
+ int need = qakkaFig.getQueueInMemorySize() - inMemoryQueue.size( queueName );
+ int count = 0;
+
+ while ( multiShardIterator.hasNext() && count < need ) {
+ DatabaseQueueMessage queueMessage = multiShardIterator.next();
+ inMemoryQueue.add( queueName, queueMessage );
+ count++;
+ }
+
+ if ( multiShardIterator.getCurrentShard() != null ) {
+ startingShards.put( shardKey, multiShardIterator.getCurrentShard().getShardId() );
+ }
+
+ if ( count > 0 ) {
+ logger.debug( "Added {} in-memory for queue {}, new size = {}",
+ count, queueName, inMemoryQueue.size( queueName ) );
+ }
+ }
+
+ } finally {
+ timer.close();
+ }
+
+ }
+
+ private String createShardKey(String queueName, Shard.Type type, String region ) {
+ return queueName + "_" + type + region;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java
index 3dc695e..ccc39f5 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java
@@ -26,18 +26,13 @@ import akka.pattern.Patterns;
import akka.util.Timeout;
import com.codahale.metrics.Timer;
import com.google.inject.Inject;
-import com.google.inject.Injector;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
import org.apache.usergrid.persistence.actorsystem.ActorSystemManager;
-import org.apache.usergrid.persistence.qakka.App;
import org.apache.usergrid.persistence.qakka.MetricsService;
import org.apache.usergrid.persistence.qakka.QakkaFig;
import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
-import org.apache.usergrid.persistence.qakka.distributed.messages.QueueSendRequest;
-import org.apache.usergrid.persistence.qakka.distributed.messages.QueueSendResponse;
-import org.apache.usergrid.persistence.qakka.distributed.messages.QueueWriteRequest;
-import org.apache.usergrid.persistence.qakka.distributed.messages.QueueWriteResponse;
+import org.apache.usergrid.persistence.qakka.distributed.messages.*;
import org.apache.usergrid.persistence.qakka.exceptions.QakkaException;
import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLog;
@@ -80,13 +75,6 @@ public class QueueSender extends UntypedActor {
this.actorSystemFig = actorSystemFig;
this.qakkaFig = qakkaFig;
this.metricsService = metricsService;
-
-// actorSystemManager = injector.getInstance( ActorSystemManager.class );
-// transferLogSerialization = injector.getInstance( TransferLogSerialization.class );
-// auditLogSerialization = injector.getInstance( AuditLogSerialization.class );
-// actorSystemFig = injector.getInstance( ActorSystemFig.class );
-// qakkaFig = injector.getInstance( QakkaFig.class );
-// metricsService = injector.getInstance( MetricsService.class );
}
@Override
@@ -97,7 +85,7 @@ public class QueueSender extends UntypedActor {
// as far as caller is concerned, we are done.
getSender().tell( new QueueSendResponse(
- DistributedQueueService.Status.SUCCESS ), getSender() );
+ DistributedQueueService.Status.SUCCESS, qa.getQueueName() ), getSender() );
final QueueWriter.WriteStatus writeStatus = sendMessageToRegion(
qa.getQueueName(),
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
index 33f1dd9..b7a95df 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
@@ -22,29 +22,22 @@ package org.apache.usergrid.persistence.qakka.distributed.actors;
import akka.actor.UntypedActor;
import com.codahale.metrics.Timer;
import com.google.inject.Inject;
-import com.google.inject.Injector;
import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
-import org.apache.usergrid.persistence.qakka.App;
import org.apache.usergrid.persistence.qakka.MetricsService;
import org.apache.usergrid.persistence.qakka.QakkaFig;
import org.apache.usergrid.persistence.qakka.core.CassandraClient;
-import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
import org.apache.usergrid.persistence.qakka.core.QakkaUtils;
import org.apache.usergrid.persistence.qakka.distributed.messages.QueueTimeoutRequest;
-import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
import org.apache.usergrid.persistence.qakka.serialization.MultiShardMessageIterator;
import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
-import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization;
import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.text.DecimalFormat;
import java.util.Optional;
import java.util.UUID;
-import java.util.concurrent.atomic.AtomicLong;
public class QueueTimeouter extends UntypedActor {
@@ -70,12 +63,6 @@ public class QueueTimeouter extends UntypedActor {
this.actorSystemFig = actorSystemFig;
this.qakkaFig = qakkaFig;
this.cassandraClient = cassandraClient;
-
-// messageSerialization = injector.getInstance( QueueMessageSerialization.class );
-// actorSystemFig = injector.getInstance( ActorSystemFig.class );
-// qakkaFig = injector.getInstance( QakkaFig.class );
-// metricsService = injector.getInstance( MetricsService.class );
-// cassandraClient = injector.getInstance( CassandraClientImpl.class );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
index e014d59..a7dbbd0 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
@@ -22,17 +22,16 @@ package org.apache.usergrid.persistence.qakka.distributed.actors;
import akka.actor.UntypedActor;
import com.codahale.metrics.Timer;
import com.google.inject.Inject;
-import com.google.inject.Injector;
-import org.apache.usergrid.persistence.qakka.App;
import org.apache.usergrid.persistence.qakka.MetricsService;
import org.apache.usergrid.persistence.qakka.core.QakkaUtils;
import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
+import org.apache.usergrid.persistence.qakka.distributed.messages.QueueAckRequest;
+import org.apache.usergrid.persistence.qakka.distributed.messages.QueueAckResponse;
import org.apache.usergrid.persistence.qakka.distributed.messages.QueueWriteRequest;
import org.apache.usergrid.persistence.qakka.distributed.messages.QueueWriteResponse;
import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLog;
import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLogSerialization;
import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
-import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization;
import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
import org.apache.usergrid.persistence.qakka.serialization.transferlog.TransferLogSerialization;
import org.slf4j.Logger;
@@ -50,24 +49,21 @@ public class QueueWriter extends UntypedActor {
private final TransferLogSerialization transferLogSerialization;
private final AuditLogSerialization auditLogSerialization;
private final MetricsService metricsService;
-
+ private final QueueActorHelper queueActorHelper;
@Inject
public QueueWriter(
QueueMessageSerialization messageSerialization,
TransferLogSerialization transferLogSerialization,
AuditLogSerialization auditLogSerialization,
- MetricsService metricsService
+ MetricsService metricsService,
+ QueueActorHelper queueActorHelper
) {
- this.messageSerialization = messageSerialization;
+ this.messageSerialization = messageSerialization;
this.transferLogSerialization = transferLogSerialization;
- this.auditLogSerialization = auditLogSerialization;
- this.metricsService = metricsService;
-
-// messageSerialization = injector.getInstance( QueueMessageSerialization.class );
-// transferLogSerialization = injector.getInstance( TransferLogSerialization.class );
-// auditLogSerialization = injector.getInstance( AuditLogSerialization.class );
-// metricsService = injector.getInstance( MetricsService.class );
+ this.auditLogSerialization = auditLogSerialization;
+ this.metricsService = metricsService;
+ this.queueActorHelper = queueActorHelper;
}
@Override
@@ -86,6 +82,7 @@ public class QueueWriter extends UntypedActor {
DatabaseQueueMessage dbqm = null;
long currentTime = System.currentTimeMillis();
+ String queueName = qa.getQueueName();
try {
dbqm = new DatabaseQueueMessage(
@@ -115,7 +112,7 @@ public class QueueWriter extends UntypedActor {
dbqm.getMessageId() );
getSender().tell( new QueueWriteResponse(
- QueueWriter.WriteStatus.ERROR ), getSender() );
+ QueueWriter.WriteStatus.ERROR, queueName ), getSender() );
return;
}
@@ -136,7 +133,7 @@ public class QueueWriter extends UntypedActor {
qa.getMessageId() );
getSender().tell( new QueueWriteResponse(
- QueueWriter.WriteStatus.SUCCESS_XFERLOG_DELETED ), getSender() );
+ QueueWriter.WriteStatus.SUCCESS_XFERLOG_DELETED, queueName ), getSender() );
} catch (Throwable e) {
logger.debug( "Unable to delete transfer log for {} {} {} {}",
@@ -147,13 +144,33 @@ public class QueueWriter extends UntypedActor {
logger.debug("Error deleting transferlog", e);
getSender().tell( new QueueWriteResponse(
- QueueWriter.WriteStatus.SUCCESS_XFERLOG_NOTDELETED ), getSender() );
+ QueueWriter.WriteStatus.SUCCESS_XFERLOG_NOTDELETED, queueName ), getSender() );
}
} finally {
timer.close();
}
+ } else if ( message instanceof QueueAckRequest ){
+
+ Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.ACK_TIME_ACK ).time();
+ try {
+
+ QueueAckRequest queueAckRequest = (QueueAckRequest) message;
+
+ DistributedQueueService.Status status = queueActorHelper.ackQueueMessage(
+ queueAckRequest.getQueueName(),
+ queueAckRequest.getQueueMessageId() );
+
+ getSender().tell( new QueueAckResponse(
+ queueAckRequest.getQueueName(),
+ queueAckRequest.getQueueMessageId(),
+ status ), getSender() );
+
+ } finally {
+ timer.close();
+ }
+
} else {
unhandled( message );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java
index 0e3e981..cb06c1d 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java
@@ -24,8 +24,8 @@ import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.routing.FromConfig;
import com.google.inject.Inject;
-import com.google.inject.Injector;
import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer;
+import org.apache.usergrid.persistence.qakka.distributed.messages.QueueAckRequest;
import org.apache.usergrid.persistence.qakka.distributed.messages.QueueWriteRequest;
@@ -37,7 +37,7 @@ public class QueueWriterRouter extends UntypedActor {
private final ActorRef router;
@Inject
- public QueueWriterRouter( Injector injector ) {
+ public QueueWriterRouter() {
this.router = getContext().actorOf( FromConfig.getInstance().props(
Props.create( GuiceActorProducer.class, QueueWriter.class )), "router");
@@ -46,7 +46,8 @@ public class QueueWriterRouter extends UntypedActor {
@Override
public void onReceive(Object message) {
- if ( message instanceof QueueWriteRequest) {
+ if ( message instanceof QueueWriteRequest || message instanceof QueueAckRequest ) {
+
router.tell( message, getSender() );
} else {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
index 9063242..20bf608 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
@@ -22,14 +22,16 @@ package org.apache.usergrid.persistence.qakka.distributed.impl;
import akka.actor.ActorRef;
import akka.pattern.Patterns;
import akka.util.Timeout;
+import com.codahale.metrics.*;
+import com.codahale.metrics.Timer;
import com.datastax.driver.core.exceptions.InvalidQueryException;
-import com.google.inject.Guice;
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.google.inject.Singleton;
import org.apache.usergrid.persistence.actorsystem.ActorSystemManager;
import org.apache.usergrid.persistence.actorsystem.ClientActor;
import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer;
+import org.apache.usergrid.persistence.qakka.MetricsService;
import org.apache.usergrid.persistence.qakka.QakkaFig;
import org.apache.usergrid.persistence.qakka.core.QueueManager;
import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
@@ -37,7 +39,6 @@ import org.apache.usergrid.persistence.qakka.distributed.messages.*;
import org.apache.usergrid.persistence.qakka.exceptions.NotFoundException;
import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
-import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Await;
@@ -56,19 +57,21 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
private final ActorSystemManager actorSystemManager;
private final QueueManager queueManager;
private final QakkaFig qakkaFig;
-
+ private final MetricsService metricsService;
@Inject
public DistributedQueueServiceImpl(
Injector injector,
ActorSystemManager actorSystemManager,
QueueManager queueManager,
- QakkaFig qakkaFig
+ QakkaFig qakkaFig,
+ MetricsService metricsService
) {
this.actorSystemManager = actorSystemManager;
this.queueManager = queueManager;
this.qakkaFig = qakkaFig;
+ this.metricsService = metricsService;
GuiceActorProducer.INJECTOR = injector;
}
@@ -154,59 +157,66 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
String queueName, String sourceRegion, String destRegion, UUID messageId,
Long deliveryTime, Long expirationTime ) {
- if ( queueManager.getQueueConfig( queueName ) == null ) {
- throw new NotFoundException( "Queue not found: " + queueName );
- }
+ Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.SEND_TIME_TOTAL ).time();
+ try {
- int maxRetries = qakkaFig.getMaxSendRetries();
- int retries = 0;
+ if ( queueManager.getQueueConfig( queueName ) == null ) {
+ throw new NotFoundException( "Queue not found: " + queueName );
+ }
- QueueSendRequest request = new QueueSendRequest(
- queueName, sourceRegion, destRegion, messageId, deliveryTime, expirationTime );
+ int maxRetries = qakkaFig.getMaxSendRetries();
+ int retries = 0;
- while ( retries++ < maxRetries ) {
- try {
- Timeout t = new Timeout( qakkaFig.getSendTimeoutSeconds(), TimeUnit.SECONDS );
+ QueueSendRequest request = new QueueSendRequest(
+ queueName, sourceRegion, destRegion, messageId, deliveryTime, expirationTime );
- // send to current region via local clientActor
- ActorRef clientActor = actorSystemManager.getClientActor();
- Future<Object> fut = Patterns.ask( clientActor, request, t );
+ while ( retries++ < maxRetries ) {
+ try {
+ Timeout t = new Timeout( qakkaFig.getSendTimeoutSeconds(), TimeUnit.SECONDS );
- // wait for response...
- final Object response = Await.result( fut, t.duration() );
+ // send to current region via local clientActor
+ ActorRef clientActor = actorSystemManager.getClientActor();
+ Future<Object> fut = Patterns.ask( clientActor, request, t );
- if ( response != null && response instanceof QueueSendResponse) {
- QueueSendResponse qarm = (QueueSendResponse)response;
+ // wait for response...
+ final Object response = Await.result( fut, t.duration() );
- if ( !DistributedQueueService.Status.ERROR.equals( qarm.getSendStatus() )) {
+ if ( response != null && response instanceof QueueSendResponse) {
+ QueueSendResponse qarm = (QueueSendResponse)response;
- if ( retries > 1 ) {
- logger.debug("SUCCESS after {} retries", retries );
- }
+ if ( !DistributedQueueService.Status.ERROR.equals( qarm.getSendStatus() )) {
- // send refresh-queue-if-empty message
- QueueRefreshRequest qrr = new QueueRefreshRequest( queueName, false );
- clientActor.tell( qrr, null );
+ if ( retries > 1 ) {
+ logger.debug("SUCCESS after {} retries", retries );
+ }
+
+ // send refresh-queue-if-empty message
+ QueueRefreshRequest qrr = new QueueRefreshRequest( queueName, false );
+ clientActor.tell( qrr, null );
+
+ return qarm.getSendStatus();
+
+ } else {
+ logger.debug("ERROR STATUS sending to queue, retrying {}", retries );
+ }
- return qarm.getSendStatus();
+ } else if ( response != null ) {
+ logger.debug("NULL RESPONSE sending to queue, retrying {}", retries );
} else {
- logger.debug("ERROR STATUS sending to queue, retrying {}", retries );
+ logger.debug("TIMEOUT sending to queue, retrying {}", retries );
}
- } else if ( response != null ) {
- logger.debug("NULL RESPONSE sending to queue, retrying {}", retries );
-
- } else {
- logger.debug("TIMEOUT sending to queue, retrying {}", retries );
+ } catch ( Exception e ) {
+ logger.debug("ERROR sending to queue, retrying " + retries, e );
}
-
- } catch ( Exception e ) {
- logger.debug("ERROR sending to queue, retrying " + retries, e );
}
- }
- throw new QakkaRuntimeException( "Error sending to queue after " + retries );
+ throw new QakkaRuntimeException( "Error sending to queue after " + retries );
+
+ } finally {
+ timer.close();
+ }
}
@@ -214,19 +224,32 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
public Collection<DatabaseQueueMessage> getNextMessages( String queueName, int count ) {
List<DatabaseQueueMessage> ret = new ArrayList<>();
- long startTime = System.currentTimeMillis();
+ com.codahale.metrics.Timer.Context timer =
+ metricsService.getMetricRegistry().timer( MetricsService.GET_TIME_TOTAL ).time();
+
+ try {
- while ( ret.size() < count
- && System.currentTimeMillis() - startTime < qakkaFig.getLongPollTimeMillis()) {
+ long startTime = System.currentTimeMillis();
- ret.addAll( getNextMessagesInternal( queueName, count ));
+ while ( ret.size() < count
+ && System.currentTimeMillis() - startTime < qakkaFig.getLongPollTimeMillis()) {
- if ( ret.size() < count ) {
- try { Thread.sleep( qakkaFig.getLongPollTimeMillis() / 2 ); } catch (Exception ignored) {}
+ ret.addAll( getNextMessagesInternal( queueName, count ));
+
+ if ( ret.size() < count ) {
+ try { Thread.sleep( qakkaFig.getLongPollTimeMillis() / 2 ); } catch (Exception ignored) {}
+ }
}
+
+ if ( ret.isEmpty() ) {
+ logger.info( "Requested {} but queue '{}' is empty", count, queueName);
+ }
+ return ret;
+
+ } finally {
+ timer.close();
}
- return ret;
}
@@ -242,10 +265,10 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
}
int maxRetries = qakkaFig.getMaxGetRetries();
- int retries = 0;
+ int tries = 0;
QueueGetRequest request = new QueueGetRequest( queueName, count );
- while ( ++retries < maxRetries ) {
+ while ( ++tries < maxRetries ) {
try {
Timeout t = new Timeout( qakkaFig.getGetTimeoutSeconds(), TimeUnit.SECONDS );
@@ -261,8 +284,8 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
if ( response != null && response instanceof QueueGetResponse) {
QueueGetResponse qprm = (QueueGetResponse)response;
if ( qprm.isSuccess() ) {
- if (retries > 1) {
- logger.debug( "getNextMessage {} SUCCESS after {} retries", queueName, retries );
+ if (tries > 1) {
+ logger.warn( "getNextMessage {} SUCCESS after {} tries", queueName, tries );
}
}
logger.debug("Returning queue {} messages {}", queueName, qprm.getQueueMessages().size());
@@ -270,41 +293,49 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
} else if ( response != null ) {
- logger.debug("ERROR RESPONSE (1) popping queue {}, retrying {}", queueName, retries );
+ logger.debug("ERROR RESPONSE (1) popping queue {}, retrying {}", queueName, tries );
} else {
- logger.debug("TIMEOUT popping from queue {}, retrying {}", queueName, retries );
+ logger.debug("TIMEOUT popping from queue {}, retrying {}", queueName, tries );
}
} else if ( responseObject instanceof ClientActor.ErrorResponse ) {
final ClientActor.ErrorResponse errorResponse = (ClientActor.ErrorResponse)responseObject;
logger.debug("ACTORSYSTEM ERROR popping queue: {}, retrying {}",
- errorResponse.getMessage(), retries );
+ errorResponse.getMessage(), tries );
} else {
- logger.debug("UNKNOWN RESPONSE popping queue {}, retrying {}", queueName, retries );
+ logger.debug("UNKNOWN RESPONSE popping queue {}, retrying {}", queueName, tries );
}
} catch ( Exception e ) {
- logger.debug("ERROR popping to queue " + queueName + " retrying " + retries, e );
+ logger.error("ERROR popping to queue " + queueName + " retrying " + tries, e );
}
}
throw new QakkaRuntimeException(
- "Error getting from queue " + queueName + " after " + retries + " tries");
+ "Error getting from queue " + queueName + " after " + tries + " tries");
}
@Override
public Status ackMessage(String queueName, UUID queueMessageId ) {
- if ( queueManager.getQueueConfig( queueName ) == null ) {
- throw new NotFoundException( "Queue not found: " + queueName );
- }
+ Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.ACK_TIME_TOTAL ).time();
+ try {
- QueueAckRequest message = new QueueAckRequest( queueName, queueMessageId );
- return sendMessageToLocalQueueActors( message );
+ if ( queueManager.getQueueConfig( queueName ) == null ) {
+ throw new NotFoundException( "Queue not found: " + queueName );
+ }
+
+ QueueAckRequest message = new QueueAckRequest( queueName, queueMessageId );
+ return sendMessageToLocalRouters( message );
+
+
+ } finally {
+ timer.close();
+ }
}
@@ -316,7 +347,7 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
}
QueueAckRequest message = new QueueAckRequest( queueName, messageId );
- return sendMessageToLocalQueueActors( message );
+ return sendMessageToLocalRouters( message );
}
@@ -332,7 +363,7 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
}
- private Status sendMessageToLocalQueueActors( QakkaMessage message ) {
+ private Status sendMessageToLocalRouters( QakkaMessage message ) {
int maxRetries = 5;
int retries = 0;
@@ -367,6 +398,6 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
}
public void shutdown() {
- actorSystemManager.shutdownAll();
+ // no op
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueActorRouterProducer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueActorRouterProducer.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueActorRouterProducer.java
index d74936b..3bf6180 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueActorRouterProducer.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueActorRouterProducer.java
@@ -28,6 +28,7 @@ import akka.cluster.singleton.ClusterSingletonProxy;
import akka.cluster.singleton.ClusterSingletonProxySettings;
import com.google.inject.Inject;
import com.google.inject.Injector;
+import com.google.inject.Singleton;
import org.apache.usergrid.persistence.actorsystem.ActorSystemManager;
import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer;
import org.apache.usergrid.persistence.actorsystem.RouterProducer;
@@ -41,6 +42,7 @@ import java.util.HashMap;
import java.util.Map;
+@Singleton
public class QueueActorRouterProducer implements RouterProducer {
static Injector injector;
@@ -131,7 +133,6 @@ public class QueueActorRouterProducer implements RouterProducer {
public Collection<Class> getMessageTypes() {
return new ArrayList() {{
add( QueueGetRequest.class );
- add( QueueAckRequest.class );
add( QueueInitRequest.class );
add( QueueRefreshRequest.class );
add( QueueTimeoutRequest.class );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueWriterRouterProducer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueWriterRouterProducer.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueWriterRouterProducer.java
index 006f1a7..c4e7acc 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueWriterRouterProducer.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueWriterRouterProducer.java
@@ -28,19 +28,22 @@ import akka.cluster.singleton.ClusterSingletonProxy;
import akka.cluster.singleton.ClusterSingletonProxySettings;
import com.google.inject.Inject;
import com.google.inject.Injector;
+import com.google.inject.Singleton;
import org.apache.usergrid.persistence.actorsystem.ActorSystemManager;
import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer;
import org.apache.usergrid.persistence.actorsystem.RouterProducer;
import org.apache.usergrid.persistence.qakka.QakkaFig;
import org.apache.usergrid.persistence.qakka.distributed.actors.QueueWriterRouter;
+import org.apache.usergrid.persistence.qakka.distributed.messages.QueueAckRequest;
import org.apache.usergrid.persistence.qakka.distributed.messages.QueueWriteRequest;
+import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+@Singleton
public class QueueWriterRouterProducer implements RouterProducer {
static Injector injector;
@@ -128,7 +131,11 @@ public class QueueWriterRouterProducer implements RouterProducer {
@Override
public Collection<Class> getMessageTypes() {
- return Collections.singletonList( QueueWriteRequest.class );
+ return new ArrayList() {{
+ add( QueueAckRequest.class );
+ add( QueueWriteRequest.class );
+ }};
+
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QakkaMessage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QakkaMessage.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QakkaMessage.java
index a1bbf14..a3919ea 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QakkaMessage.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QakkaMessage.java
@@ -21,8 +21,8 @@ package org.apache.usergrid.persistence.qakka.distributed.messages;
import java.io.Serializable;
-/**
- * Marker interface
- */
+
public interface QakkaMessage extends Serializable {
+
+ String getQueueName();
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueGetResponse.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueGetResponse.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueGetResponse.java
index c8004fb..1776360 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueGetResponse.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueGetResponse.java
@@ -30,16 +30,19 @@ import java.util.Collections;
public class QueueGetResponse implements QakkaMessage {
private final Collection<DatabaseQueueMessage> queueMessages;
private final DistributedQueueService.Status status;
+ private final String queueName;
-
- public QueueGetResponse(DistributedQueueService.Status status ) {
+ public QueueGetResponse(DistributedQueueService.Status status, String queueName ) {
this.status = status;
this.queueMessages = Collections.emptyList();
+ this.queueName = queueName;
}
- public QueueGetResponse(DistributedQueueService.Status status, Collection<DatabaseQueueMessage> queueMessages) {
+ public QueueGetResponse(
+ DistributedQueueService.Status status, Collection<DatabaseQueueMessage> queueMessages, String queueName) {
this.status = status;
this.queueMessages = queueMessages;
+ this.queueName = queueName;
}
public DistributedQueueService.Status getStatus() {
@@ -60,4 +63,9 @@ public class QueueGetResponse implements QakkaMessage {
.append( "status", status )
.toString();
}
+
+ @Override
+ public String getQueueName() {
+ return queueName;
+ }
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueSendResponse.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueSendResponse.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueSendResponse.java
index 0c295a0..9b065af 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueSendResponse.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueSendResponse.java
@@ -25,9 +25,12 @@ import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService
public class QueueSendResponse implements QakkaMessage {
private final DistributedQueueService.Status status;
+ private final String queueName;
- public QueueSendResponse(DistributedQueueService.Status status) {
+
+ public QueueSendResponse( DistributedQueueService.Status status, String queueName ) {
this.status = status;
+ this.queueName = queueName;
}
public DistributedQueueService.Status getSendStatus() {
@@ -40,4 +43,8 @@ public class QueueSendResponse implements QakkaMessage {
.toString();
}
+ @Override
+ public String getQueueName() {
+ return queueName;
+ }
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueWriteResponse.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueWriteResponse.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueWriteResponse.java
index 1eb513c..463b4df 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueWriteResponse.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueWriteResponse.java
@@ -25,9 +25,11 @@ import org.apache.usergrid.persistence.qakka.distributed.actors.QueueWriter;
public class QueueWriteResponse implements QakkaMessage {
private final QueueWriter.WriteStatus status;
+ private String queueName;
- public QueueWriteResponse(QueueWriter.WriteStatus status) {
+ public QueueWriteResponse(QueueWriter.WriteStatus status, String queueName ) {
this.status = status;
+ this.queueName = queueName;
}
public QueueWriter.WriteStatus getSendStatus() {
@@ -40,4 +42,8 @@ public class QueueWriteResponse implements QakkaMessage {
.toString();
}
+ @Override
+ public String getQueueName() {
+ return queueName;
+ }
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardMessageIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardMessageIterator.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardMessageIterator.java
index 6ec0774..29327e2 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardMessageIterator.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardMessageIterator.java
@@ -49,6 +49,7 @@ public class MultiShardMessageIterator implements Iterator<DatabaseQueueMessage>
private final Iterator<Shard> shardIterator;
private Iterator<DatabaseQueueMessage> currentIterator;
+
private Shard currentShard;
private UUID nextStart;
@@ -184,4 +185,9 @@ public class MultiShardMessageIterator implements Iterator<DatabaseQueueMessage>
}
+
+ public Shard getCurrentShard() {
+ return currentShard;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/QueueMessageSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/QueueMessageSerialization.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/QueueMessageSerialization.java
index 3ebe735..86c50a5 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/QueueMessageSerialization.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/QueueMessageSerialization.java
@@ -51,4 +51,9 @@ public interface QueueMessageSerialization extends Migration {
DatabaseQueueMessageBody loadMessageData(final UUID messageId);
void deleteMessageData(final UUID messageId);
+
+ /**
+ * Write message to inflight table and remove from available table
+ */
+ void putInflight( DatabaseQueueMessage queueMessage );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
index d9a2543..33de7bc 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
@@ -19,6 +19,7 @@
package org.apache.usergrid.persistence.qakka.serialization.queuemessages.impl;
+import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.querybuilder.Clause;
@@ -137,12 +138,6 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization
final UUID queueMessageId = message.getQueueMessageId() == null ?
QakkaUtils.getTimeUuid() : message.getQueueMessageId();
- long queuedAt = message.getQueuedAt() == null ?
- System.currentTimeMillis() : message.getQueuedAt();
-
- long inflightAt = message.getInflightAt() == null ?
- message.getQueuedAt() : message.getInflightAt();
-
Shard.Type shardType = DatabaseQueueMessage.Type.DEFAULT.equals( message.getType() ) ?
Shard.Type.DEFAULT : Shard.Type.INFLIGHT;
@@ -152,16 +147,7 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization
message.setShardId( shard.getShardId() );
}
- Statement insert = QueryBuilder.insertInto(getTableName(message.getType()))
- .value( COLUMN_QUEUE_NAME, message.getQueueName())
- .value( COLUMN_REGION, message.getRegion())
- .value( COLUMN_SHARD_ID, message.getShardId())
- .value( COLUMN_MESSAGE_ID, message.getMessageId())
- .value( COLUMN_QUEUE_MESSAGE_ID, queueMessageId)
- .value( COLUMN_INFLIGHT_AT, inflightAt )
- .value( COLUMN_QUEUED_AT, queuedAt)
- .using( QueryBuilder.ttl( maxTtl ) );
-
+ Statement insert = createWriteMessageStatement( message );
cassandraClient.getQueueMessageSession().execute(insert);
shardCounterSerialization.incrementCounter( message.getQueueName(), shardType, message.getShardId(), 1 );
@@ -233,28 +219,7 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization
final DatabaseQueueMessage.Type type,
final UUID queueMessageId ) {
- final long shardId;
- if ( shardIdOrNull == null ) {
- Shard.Type shardType = DatabaseQueueMessage.Type.DEFAULT.equals( type ) ?
- Shard.Type.DEFAULT : Shard.Type.INFLIGHT;
- Shard shard = shardStrategy.selectShard(
- queueName, actorSystemFig.getRegionLocal(), shardType, queueMessageId );
- shardId = shard.getShardId();
- } else {
- shardId = shardIdOrNull;
- }
-
- Clause queueNameClause = QueryBuilder.eq( COLUMN_QUEUE_NAME, queueName );
- Clause regionClause = QueryBuilder.eq( COLUMN_REGION, region );
- Clause shardIdClause = QueryBuilder.eq( COLUMN_SHARD_ID, shardId );
- Clause queueMessageIdClause = QueryBuilder.eq( COLUMN_QUEUE_MESSAGE_ID, queueMessageId);
-
- Statement delete = QueryBuilder.delete().from(getTableName( type ))
- .where(queueNameClause)
- .and(regionClause)
- .and(shardIdClause)
- .and(queueMessageIdClause);
-
+ Statement delete = createDeleteMessageStatement( queueName, region, null, type,queueMessageId);
cassandraClient.getQueueMessageSession().execute( delete );
messageCounterSerialization.decrementCounter( queueName, type, 1L );
@@ -297,14 +262,118 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization
public void deleteMessageData( final UUID messageId ) {
Clause messageIdClause = QueryBuilder.eq(COLUMN_MESSAGE_ID, messageId);
-
- Statement delete = QueryBuilder.delete().from(TABLE_MESSAGE_DATA)
- .where(messageIdClause);
+ Statement delete = QueryBuilder.delete().from(TABLE_MESSAGE_DATA).where(messageIdClause);
cassandraClient.getApplicationSession().execute(delete);
}
+ @Override
+ public void putInflight( DatabaseQueueMessage message ) {
+
+ // create statement to write new queue message to inflight table
+
+ Shard.Type shardType = Shard.Type.INFLIGHT;
+ Shard shard = shardStrategy.selectShard(
+ message.getQueueName(), message.getRegion(), shardType, message.getQueueMessageId() );
+
+ DatabaseQueueMessage inflightMessage = new DatabaseQueueMessage(
+ message.getMessageId(),
+ DatabaseQueueMessage.Type.INFLIGHT,
+ message.getQueueName(),
+ message.getRegion(),
+ shard.getShardId(),
+ message.getQueuedAt(),
+ System.currentTimeMillis(),
+ message.getQueueMessageId() );
+
+ Statement insert = createWriteMessageStatement( inflightMessage );
+
+ // create statement to delete queue message from available table
+
+ Statement delete = createDeleteMessageStatement(
+ message.getQueueName(),
+ message.getRegion(),
+ null,
+ DatabaseQueueMessage.Type.DEFAULT,
+ message.getQueueMessageId());
+
+ // execute statements as a batch
+
+ BatchStatement batchStatement = new BatchStatement();
+ batchStatement.add( insert );
+ batchStatement.add( delete );
+ cassandraClient.getQueueMessageSession().execute( batchStatement );
+
+ // bump counters
+
+ shardCounterSerialization.incrementCounter(
+ message.getQueueName(), Shard.Type.INFLIGHT, message.getShardId(), 1 );
+
+ messageCounterSerialization.incrementCounter(
+ message.getQueueName(), DatabaseQueueMessage.Type.INFLIGHT, 1L );
+
+ messageCounterSerialization.decrementCounter(
+ message.getQueueName(), DatabaseQueueMessage.Type.DEFAULT, 1L );
+ }
+
+
+ private Statement createDeleteMessageStatement( final String queueName,
+ final String region,
+ final Long shardIdOrNull,
+ final DatabaseQueueMessage.Type type,
+ final UUID queueMessageId ) {
+ final long shardId;
+ if ( shardIdOrNull == null ) {
+ Shard.Type shardType = DatabaseQueueMessage.Type.DEFAULT.equals( type ) ?
+ Shard.Type.DEFAULT : Shard.Type.INFLIGHT;
+ Shard shard = shardStrategy.selectShard(
+ queueName, region, shardType, queueMessageId );
+ shardId = shard.getShardId();
+ } else {
+ shardId = shardIdOrNull;
+ }
+
+ Clause queueNameClause = QueryBuilder.eq( COLUMN_QUEUE_NAME, queueName );
+ Clause regionClause = QueryBuilder.eq( COLUMN_REGION, region );
+ Clause shardIdClause = QueryBuilder.eq( COLUMN_SHARD_ID, shardId );
+ Clause queueMessageIdClause = QueryBuilder.eq( COLUMN_QUEUE_MESSAGE_ID, queueMessageId);
+
+ Statement delete = QueryBuilder.delete().from(getTableName( type ))
+ .where(queueNameClause)
+ .and(regionClause)
+ .and(shardIdClause)
+ .and(queueMessageIdClause);
+
+ return delete;
+ }
+
+
+ private Statement createWriteMessageStatement( DatabaseQueueMessage message ) {
+
+ final UUID queueMessageId = message.getQueueMessageId() == null ?
+ QakkaUtils.getTimeUuid() : message.getQueueMessageId();
+
+ long queuedAt = message.getQueuedAt() == null ?
+ System.currentTimeMillis() : message.getQueuedAt();
+
+ long inflightAt = message.getInflightAt() == null ?
+ message.getQueuedAt() : message.getInflightAt();
+
+ Statement insert = QueryBuilder.insertInto(getTableName(message.getType()))
+ .value( COLUMN_QUEUE_NAME, message.getQueueName())
+ .value( COLUMN_REGION, message.getRegion())
+ .value( COLUMN_SHARD_ID, message.getShardId())
+ .value( COLUMN_MESSAGE_ID, message.getMessageId())
+ .value( COLUMN_QUEUE_MESSAGE_ID, queueMessageId)
+ .value( COLUMN_INFLIGHT_AT, inflightAt )
+ .value( COLUMN_QUEUED_AT, queuedAt)
+ .using( QueryBuilder.ttl( maxTtl ) );
+
+ return insert;
+ }
+
+
public static String getTableName(DatabaseQueueMessage.Type messageType){
String table;
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
index a5c95bd..5bd2b05 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
@@ -102,9 +102,6 @@ public class QueueActorServiceTest extends AbstractTest {
ByteBuffer blob = dqmb.getBlob();
String returnedData = new String( blob.array(), "UTF-8" );
-// ByteArrayInputStream bais = new ByteArrayInputStream( blob.array() );
-// ObjectInputStream ios = new ObjectInputStream( bais );
-// String returnedData = (String)ios.readObject();
Assert.assertEquals( data, returnedData );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelperTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelperTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelperTest.java
index 791650e..0f5b46f 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelperTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelperTest.java
@@ -177,7 +177,7 @@ public class QueueActorHelperTest extends AbstractTest {
// put message inflight
QueueActorHelper helper = injector.getInstance( QueueActorHelper.class );
- helper.putInflight( queueName, message );
+ helper.putInflight( message );
// message must be gone from messages_available table
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java
index 5b42184..9fb8f29 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java
@@ -30,6 +30,7 @@ import org.apache.usergrid.persistence.qakka.AbstractTest;
import org.apache.usergrid.persistence.qakka.QakkaFig;
import org.apache.usergrid.persistence.qakka.core.QakkaUtils;
import org.apache.usergrid.persistence.qakka.core.impl.InMemoryQueue;
+import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
import org.apache.usergrid.persistence.qakka.distributed.messages.QueueRefreshRequest;
import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
@@ -53,6 +54,8 @@ public class QueueReaderTest extends AbstractTest {
Injector injector = getInjector();
+ injector.getInstance( DistributedQueueService.class ); // init the INJECTOR
+
QakkaFig qakkaFig = injector.getInstance( QakkaFig.class );
ActorSystemFig actorSystemFig = injector.getInstance( ActorSystemFig.class );
ShardSerialization shardSerialization = injector.getInstance( ShardSerialization.class );
@@ -89,13 +92,16 @@ public class QueueReaderTest extends AbstractTest {
// run the QueueRefresher to fill up the in-memory queue
- QueueActorHelper helper = injector.getInstance( QueueActorHelper.class );
+ ActorSystem system = ActorSystem.create("Test-" + queueName);
+ ActorRef queueReaderRef = system.actorOf(
+ Props.create( GuiceActorProducer.class, QueueRefresher.class ), "queueReader");
+ QueueRefreshRequest refreshRequest = new QueueRefreshRequest( queueName, false );
// need to wait for refresh to complete
int maxRetries = 10;
int retries = 0;
while ( inMemoryQueue.size( queueName ) < qakkaFig.getQueueInMemorySize() && retries++ < maxRetries ) {
- helper.queueRefresh( queueName );
+ queueReaderRef.tell( refreshRequest, null ); // tell sends message, returns immediately
Thread.sleep(1000);
}