You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2016/10/05 21:44:26 UTC
[1/5] usergrid git commit: Ensure that injector is not used in Akka
constructors to avoid "cannot serialize injector" errors
Repository: usergrid
Updated Branches:
refs/heads/usergrid-1318-queue a90a0bbd2 -> fbce160a1
Ensure that injector is not used in Akka constructors to avoid "cannot serialize injector" errors
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/fe08fa72
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/fe08fa72
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/fe08fa72
Branch: refs/heads/usergrid-1318-queue
Commit: fe08fa72f8e807565141bac871796479f28aa3f1
Parents: a90a0bb
Author: Dave Johnson <sn...@apache.org>
Authored: Wed Oct 5 17:38:32 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Wed Oct 5 17:38:32 2016 -0400
----------------------------------------------------------------------
.../qakka/distributed/actors/QueueActor.java | 75 +++++++-------------
.../distributed/actors/QueueActorRouter.java | 2 +-
.../distributed/actors/QueueRefresher.java | 1 -
.../qakka/distributed/actors/QueueSender.java | 34 +++++----
.../distributed/actors/QueueSenderRouter.java | 2 +-
.../distributed/actors/QueueTimeouter.java | 66 +++++------------
.../qakka/distributed/actors/QueueWriter.java | 30 ++++----
.../distributed/actors/QueueWriterRouter.java | 2 +-
.../distributed/actors/ShardAllocator.java | 35 ++++++---
.../impl/DistributedQueueServiceImpl.java | 13 ++--
.../impl/QueueActorRouterProducer.java | 2 +-
.../impl/QueueSenderRouterProducer.java | 2 +-
.../impl/QueueWriterRouterProducer.java | 2 +-
.../impl/QueueMessageSerializationImpl.java | 18 +++--
14 files changed, 132 insertions(+), 152 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/fe08fa72/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
index 64f12d4..9ce38ef 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
@@ -52,7 +52,6 @@ public class QueueActor extends UntypedActor {
private final InMemoryQueue inMemoryQueue;
private final QueueActorHelper queueActorHelper;
private final MetricsService metricsService;
-
private final MessageCounterSerialization messageCounterSerialization;
private final Map<String, Cancellable> refreshSchedulersByQueueName = new HashMap<>();
@@ -63,24 +62,32 @@ public class QueueActor extends UntypedActor {
private final Map<String, ActorRef> queueTimeoutersByQueueName = new HashMap<>();
private final Map<String, ActorRef> shardAllocatorsByQueueName = new HashMap<>();
- private final AtomicLong runCount = new AtomicLong(0);
- private final AtomicLong messageCount = new AtomicLong(0);
private final Set<String> queuesSeen = new HashSet<>();
- private final Injector injector;
+ //private final Injector injector;
@Inject
- public QueueActor( Injector injector ) {
-
- this.injector = injector;
-
- qakkaFig = injector.getInstance( QakkaFig.class );
- inMemoryQueue = injector.getInstance( InMemoryQueue.class );
- queueActorHelper = injector.getInstance( QueueActorHelper.class );
- metricsService = injector.getInstance( MetricsService.class );
-
- messageCounterSerialization = injector.getInstance( MessageCounterSerialization.class );
+ public QueueActor(
+ //Injector injector,
+ QakkaFig qakkaFig,
+ InMemoryQueue inMemoryQueue,
+ QueueActorHelper queueActorHelper,
+ MetricsService metricsService,
+ MessageCounterSerialization messageCounterSerialization
+ ) {
+ //this.injector = injector;
+ this.qakkaFig = qakkaFig;
+ this.inMemoryQueue = inMemoryQueue;
+ this.queueActorHelper = queueActorHelper;
+ this.metricsService = metricsService;
+ this.messageCounterSerialization = messageCounterSerialization;
+
+// qakkaFig = injector.getInstance( QakkaFig.class );
+// inMemoryQueue = injector.getInstance( InMemoryQueue.class );
+// queueActorHelper = injector.getInstance( QueueActorHelper.class );
+// metricsService = injector.getInstance( MetricsService.class );
+// messageCounterSerialization = injector.getInstance( MessageCounterSerialization.class );
}
@Override
@@ -138,7 +145,7 @@ public class QueueActor extends UntypedActor {
if ( !request.isOnlyIfEmpty() || inMemoryQueue.peek( request.getQueueName()) == null ) {
ActorRef readerRef = getContext().actorOf(
- Props.create( GuiceActorProducer.class, injector, QueueRefresher.class ),
+ Props.create( GuiceActorProducer.class, QueueRefresher.class ),
request.getQueueName() + "_reader");
queueReadersByQueueName.put( request.getQueueName(), readerRef );
}
@@ -154,7 +161,7 @@ public class QueueActor extends UntypedActor {
if ( queueTimeoutersByQueueName.get( request.getQueueName() ) == null ) {
ActorRef readerRef = getContext().actorOf(
- Props.create( GuiceActorProducer.class, injector, QueueTimeouter.class),
+ Props.create( GuiceActorProducer.class, QueueTimeouter.class),
request.getQueueName() + "_timeouter");
queueTimeoutersByQueueName.put( request.getQueueName(), readerRef );
}
@@ -169,7 +176,7 @@ public class QueueActor extends UntypedActor {
if ( shardAllocatorsByQueueName.get( request.getQueueName() ) == null ) {
ActorRef readerRef = getContext().actorOf(
- Props.create( GuiceActorProducer.class, injector, ShardAllocator.class),
+ Props.create( GuiceActorProducer.class, ShardAllocator.class),
request.getQueueName() + "_shard_allocator");
shardAllocatorsByQueueName.put( request.getQueueName(), readerRef );
}
@@ -191,43 +198,9 @@ public class QueueActor extends UntypedActor {
Collection<DatabaseQueueMessage> messages = queueActorHelper.getMessages( queueName, numRequested);
- messageCounterSerialization.decrementCounter(
- queueName,
- DatabaseQueueMessage.Type.DEFAULT,
- messages.size());
-
getSender().tell( new QueueGetResponse(
DistributedQueueService.Status.SUCCESS, messages ), getSender() );
-// long runs = runCount.incrementAndGet();
-// long messagesReturned = messageCount.addAndGet( queueMessages.size() );
-//
-// if ( logger.isDebugEnabled() && runs % 100 == 0 ) {
-//
-// final DecimalFormat format = new DecimalFormat("##.###");
-// final long nano = 1000000000;
-// Timer t = metricsService.getMetricRegistry().timer(MetricsService.GET_TIME_GET );
-//
-// logger.debug("QueueActor get stats (queues {}):\n" +
-// " Num runs={}\n" +
-// " Messages={}\n" +
-// " Mean={}\n" +
-// " One min rate={}\n" +
-// " Five min rate={}\n" +
-// " Snapshot mean={}\n" +
-// " Snapshot min={}\n" +
-// " Snapshot max={}",
-// queuesSeen.toArray(),
-// t.getCount(),
-// messagesReturned,
-// format.format( t.getMeanRate() ),
-// format.format( t.getOneMinuteRate() ),
-// format.format( t.getFiveMinuteRate() ),
-// format.format( t.getSnapshot().getMean() / nano ),
-// format.format( (double) t.getSnapshot().getMin() / nano ),
-// format.format( (double) t.getSnapshot().getMax() / nano ) );
-// }
-
} finally {
timer.close();
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/fe08fa72/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
index 9257a0d..c40a3d9 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
@@ -42,7 +42,7 @@ public class QueueActorRouter extends UntypedActor {
public QueueActorRouter( Injector injector ) {
this.routerRef = getContext().actorOf( FromConfig.getInstance().props(
- Props.create(GuiceActorProducer.class, injector, QueueActor.class)), "router");
+ Props.create(GuiceActorProducer.class, QueueActor.class)), "router");
}
@Override
http://git-wip-us.apache.org/repos/asf/usergrid/blob/fe08fa72/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
index 2f70088..ae9969c 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
@@ -43,7 +43,6 @@ public class QueueRefresher extends UntypedActor {
if ( message instanceof QueueRefreshRequest ) {
QueueRefreshRequest request = (QueueRefreshRequest) message;
- logger.debug( "running for queue {}", request.getQueueName() );
String queueName = request.getQueueName();
helper.queueRefresh( queueName );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/fe08fa72/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java
index 739e1c4..3dc695e 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java
@@ -61,19 +61,32 @@ public class QueueSender extends UntypedActor {
private final TransferLogSerialization transferLogSerialization;
private final AuditLogSerialization auditLogSerialization;
private final ActorSystemFig actorSystemFig;
- private final QakkaFig qakkaFig;
+ private final QakkaFig qakkaFig;
private final MetricsService metricsService;
@Inject
- public QueueSender( Injector injector ) {
-
- actorSystemManager = injector.getInstance( ActorSystemManager.class );
- transferLogSerialization = injector.getInstance( TransferLogSerialization.class );
- auditLogSerialization = injector.getInstance( AuditLogSerialization.class );
- actorSystemFig = injector.getInstance( ActorSystemFig.class );
- qakkaFig = injector.getInstance( QakkaFig.class );
- metricsService = injector.getInstance( MetricsService.class );
+ public QueueSender(
+ ActorSystemManager actorSystemManager,
+ TransferLogSerialization transferLogSerialization,
+ AuditLogSerialization auditLogSerialization,
+ ActorSystemFig actorSystemFig,
+ QakkaFig qakkaFig,
+ MetricsService metricsService
+ ) {
+ this.actorSystemManager = actorSystemManager;
+ this.transferLogSerialization = transferLogSerialization;
+ this.auditLogSerialization = auditLogSerialization;
+ this.actorSystemFig = actorSystemFig;
+ this.qakkaFig = qakkaFig;
+ this.metricsService = metricsService;
+
+// actorSystemManager = injector.getInstance( ActorSystemManager.class );
+// transferLogSerialization = injector.getInstance( TransferLogSerialization.class );
+// auditLogSerialization = injector.getInstance( AuditLogSerialization.class );
+// actorSystemFig = injector.getInstance( ActorSystemFig.class );
+// qakkaFig = injector.getInstance( QakkaFig.class );
+// metricsService = injector.getInstance( MetricsService.class );
}
@Override
@@ -195,9 +208,6 @@ public class QueueSender extends UntypedActor {
} else if ( writeStatus != null
&& writeStatus.equals( QueueWriter.WriteStatus.SUCCESS_XFERLOG_NOTDELETED ) ) {
- //logger.debug( "Delivery Success, now removing transfer log: {}, {}, {}, {}",
- // new Object[]{queueName, actorSystemFig.getRegionLocal(), region, messageId} );
-
// queue actor failed to clean up transfer log
try {
transferLogSerialization.removeTransferLog(
http://git-wip-us.apache.org/repos/asf/usergrid/blob/fe08fa72/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSenderRouter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSenderRouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSenderRouter.java
index 92d0785..a205d71 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSenderRouter.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSenderRouter.java
@@ -41,7 +41,7 @@ public class QueueSenderRouter extends UntypedActor {
public QueueSenderRouter( Injector injector ) {
this.router = getContext().actorOf( FromConfig.getInstance().props(
- Props.create( GuiceActorProducer.class, injector, QueueSender.class )), "router");
+ Props.create( GuiceActorProducer.class, QueueSender.class )), "router");
}
@Override
http://git-wip-us.apache.org/repos/asf/usergrid/blob/fe08fa72/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
index 9b11277..33f1dd9 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
@@ -53,24 +53,29 @@ public class QueueTimeouter extends UntypedActor {
private final QueueMessageSerialization messageSerialization;
private final MetricsService metricsService;
private final ActorSystemFig actorSystemFig;
- private final QakkaFig qakkaFig;
+ private final QakkaFig qakkaFig;
private final CassandraClient cassandraClient;
- private final MessageCounterSerialization messageCounterSerialization;
-
- private final AtomicLong runCount = new AtomicLong(0);
- private final AtomicLong totalTimedout = new AtomicLong(0);
@Inject
- public QueueTimeouter( Injector injector) {
-
- messageSerialization = injector.getInstance( QueueMessageSerialization.class );
- actorSystemFig = injector.getInstance( ActorSystemFig.class );
- qakkaFig = injector.getInstance( QakkaFig.class );
- metricsService = injector.getInstance( MetricsService.class );
- cassandraClient = injector.getInstance( CassandraClientImpl.class );
-
- messageCounterSerialization = injector.getInstance( MessageCounterSerialization.class );
+ public QueueTimeouter(
+ QueueMessageSerialization messageSerialization,
+ MetricsService metricsService,
+ ActorSystemFig actorSystemFig,
+ QakkaFig qakkaFig,
+ CassandraClient cassandraClient
+ ) {
+ this.messageSerialization = messageSerialization;
+ this.metricsService = metricsService;
+ this.actorSystemFig = actorSystemFig;
+ this.qakkaFig = qakkaFig;
+ this.cassandraClient = cassandraClient;
+
+// messageSerialization = injector.getInstance( QueueMessageSerialization.class );
+// actorSystemFig = injector.getInstance( ActorSystemFig.class );
+// qakkaFig = injector.getInstance( QakkaFig.class );
+// metricsService = injector.getInstance( MetricsService.class );
+// cassandraClient = injector.getInstance( CassandraClientImpl.class );
}
@@ -135,41 +140,8 @@ public class QueueTimeouter extends UntypedActor {
}
}
-
- long runs = runCount.incrementAndGet();
- long timeoutCount = totalTimedout.addAndGet( count );
-
- if ( logger.isDebugEnabled() && runs % 100 == 0 ) {
-
- final DecimalFormat format = new DecimalFormat("##.###");
- final long nano = 1000000000;
- Timer t = metricsService.getMetricRegistry().timer(MetricsService.TIMEOUT_TIME );
-
- logger.debug("QueueTimeouter for queue '{}' stats:\n" +
- " Num runs={}\n" +
- " Timeout count={}\n" +
- " Mean={}\n" +
- " One min rate={}\n" +
- " Five min rate={}\n" +
- " Snapshot mean={}\n" +
- " Snapshot min={}\n" +
- " Snapshot max={}",
- queueName,
- t.getCount(),
- timeoutCount,
- format.format( t.getMeanRate() ),
- format.format( t.getOneMinuteRate() ),
- format.format( t.getFiveMinuteRate() ),
- format.format( t.getSnapshot().getMean() / nano ),
- format.format( (double) t.getSnapshot().getMin() / nano ),
- format.format( (double) t.getSnapshot().getMax() / nano ) );
- }
-
if (count > 0) {
logger.debug( "Timed out {} messages for queue {}", count, queueName );
-
- messageCounterSerialization.decrementCounter(
- queueName, DatabaseQueueMessage.Type.DEFAULT, count);
}
} finally {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/fe08fa72/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
index 273f0b2..e014d59 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
@@ -46,25 +46,28 @@ public class QueueWriter extends UntypedActor {
public enum WriteStatus { SUCCESS_XFERLOG_DELETED, SUCCESS_XFERLOG_NOTDELETED, ERROR };
- private final DistributedQueueService distributedQueueService;
private final QueueMessageSerialization messageSerialization;
private final TransferLogSerialization transferLogSerialization;
private final AuditLogSerialization auditLogSerialization;
private final MetricsService metricsService;
- private final MessageCounterSerialization messageCounterSerialization;
-
@Inject
- public QueueWriter( Injector injector ) {
-
- messageSerialization = injector.getInstance( QueueMessageSerialization.class );
- transferLogSerialization = injector.getInstance( TransferLogSerialization.class );
- auditLogSerialization = injector.getInstance( AuditLogSerialization.class );
- metricsService = injector.getInstance( MetricsService.class );
-
- distributedQueueService = injector.getInstance( DistributedQueueService.class );
- messageCounterSerialization = injector.getInstance( MessageCounterSerialization.class );
+ public QueueWriter(
+ QueueMessageSerialization messageSerialization,
+ TransferLogSerialization transferLogSerialization,
+ AuditLogSerialization auditLogSerialization,
+ MetricsService metricsService
+ ) {
+ this.messageSerialization = messageSerialization;
+ this.transferLogSerialization = transferLogSerialization;
+ this.auditLogSerialization = auditLogSerialization;
+ this.metricsService = metricsService;
+
+// messageSerialization = injector.getInstance( QueueMessageSerialization.class );
+// transferLogSerialization = injector.getInstance( TransferLogSerialization.class );
+// auditLogSerialization = injector.getInstance( AuditLogSerialization.class );
+// metricsService = injector.getInstance( MetricsService.class );
}
@Override
@@ -97,9 +100,6 @@ public class QueueWriter extends UntypedActor {
messageSerialization.writeMessage( dbqm );
- messageCounterSerialization.incrementCounter(
- qa.getQueueName(), DatabaseQueueMessage.Type.DEFAULT, 1);
-
//logger.debug("Wrote queue message id {} to queue name {}",
// dbqm.getQueueMessageId(), dbqm.getQueueName());
http://git-wip-us.apache.org/repos/asf/usergrid/blob/fe08fa72/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java
index f0540af..0e3e981 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java
@@ -40,7 +40,7 @@ public class QueueWriterRouter extends UntypedActor {
public QueueWriterRouter( Injector injector ) {
this.router = getContext().actorOf( FromConfig.getInstance().props(
- Props.create( GuiceActorProducer.class, injector, QueueWriter.class )), "router");
+ Props.create( GuiceActorProducer.class, QueueWriter.class )), "router");
}
@Override
http://git-wip-us.apache.org/repos/asf/usergrid/blob/fe08fa72/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java
index 65c3370..46dc0ed 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java
@@ -31,6 +31,7 @@ import org.apache.usergrid.persistence.qakka.MetricsService;
import org.apache.usergrid.persistence.qakka.QakkaFig;
import org.apache.usergrid.persistence.qakka.core.CassandraClient;
import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
+import org.apache.usergrid.persistence.qakka.core.QakkaUtils;
import org.apache.usergrid.persistence.qakka.distributed.messages.ShardCheckRequest;
import org.apache.usergrid.persistence.qakka.exceptions.NotFoundException;
import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
@@ -59,14 +60,27 @@ public class ShardAllocator extends UntypedActor {
@Inject
- public ShardAllocator( Injector injector ) {
-
- this.qakkaFig = injector.getInstance( QakkaFig.class );
- this.shardCounterSerialization = injector.getInstance( ShardCounterSerializationImpl.class );
- this.shardSerialization = injector.getInstance( ShardSerializationImpl.class );
- this.actorSystemFig = injector.getInstance( ActorSystemFig.class );
- this.metricsService = injector.getInstance( MetricsService.class );
- this.cassandraClient = injector.getInstance( CassandraClientImpl.class );
+ public ShardAllocator(
+ QakkaFig qakkaFig,
+ ActorSystemFig actorSystemFig,
+ ShardSerialization shardSerialization,
+ ShardCounterSerialization shardCounterSerialization,
+ MetricsService metricsService,
+ CassandraClient cassandraClient
+ ) {
+ this.qakkaFig = qakkaFig;
+ this.actorSystemFig = actorSystemFig;
+ this.shardSerialization = shardSerialization;
+ this.shardCounterSerialization = shardCounterSerialization;
+ this.metricsService = metricsService;
+ this.cassandraClient = cassandraClient;
+
+// this.qakkaFig = injector.getInstance( QakkaFig.class );
+// this.shardCounterSerialization = injector.getInstance( ShardCounterSerializationImpl.class );
+// this.shardSerialization = injector.getInstance( ShardSerializationImpl.class );
+// this.actorSystemFig = injector.getInstance( ActorSystemFig.class );
+// this.metricsService = injector.getInstance( MetricsService.class );
+// this.cassandraClient = injector.getInstance( CassandraClientImpl.class );
}
@@ -106,7 +120,10 @@ public class ShardAllocator extends UntypedActor {
}
if (shard == null) {
- logger.warn( "No shard found for {}, {}, {}", queueName, region, type );
+ shard = new Shard( queueName, actorSystemFig.getRegionLocal(),
+ type, 1L, QakkaUtils.getTimeUuid());
+ shardSerialization.createShard( shard );
+
return;
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/fe08fa72/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
index af71247..9063242 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
@@ -23,10 +23,13 @@ import akka.actor.ActorRef;
import akka.pattern.Patterns;
import akka.util.Timeout;
import com.datastax.driver.core.exceptions.InvalidQueryException;
+import com.google.inject.Guice;
import com.google.inject.Inject;
+import com.google.inject.Injector;
import com.google.inject.Singleton;
import org.apache.usergrid.persistence.actorsystem.ActorSystemManager;
import org.apache.usergrid.persistence.actorsystem.ClientActor;
+import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer;
import org.apache.usergrid.persistence.qakka.QakkaFig;
import org.apache.usergrid.persistence.qakka.core.QueueManager;
import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
@@ -53,23 +56,21 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
private final ActorSystemManager actorSystemManager;
private final QueueManager queueManager;
private final QakkaFig qakkaFig;
- private final MessageCounterSerialization messageCounterSerialization;
@Inject
public DistributedQueueServiceImpl(
+ Injector injector,
ActorSystemManager actorSystemManager,
QueueManager queueManager,
- QakkaFig qakkaFig,
- MessageCounterSerialization messageCounterSerialization ) {
+ QakkaFig qakkaFig
+ ) {
this.actorSystemManager = actorSystemManager;
this.queueManager = queueManager;
this.qakkaFig = qakkaFig;
- this.messageCounterSerialization = messageCounterSerialization;
-
-
+ GuiceActorProducer.INJECTOR = injector;
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/fe08fa72/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueActorRouterProducer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueActorRouterProducer.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueActorRouterProducer.java
index 8c6adda..d74936b 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueActorRouterProducer.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueActorRouterProducer.java
@@ -73,7 +73,7 @@ public class QueueActorRouterProducer implements RouterProducer {
ClusterSingletonManagerSettings.create( system ).withRole( "io" );
system.actorOf( ClusterSingletonManager.props(
- Props.create( GuiceActorProducer.class, injector, QueueActorRouter.class ),
+ Props.create( GuiceActorProducer.class, QueueActorRouter.class ),
PoisonPill.getInstance(), settings ), "queueActorRouter" );
ClusterSingletonProxySettings proxySettings =
http://git-wip-us.apache.org/repos/asf/usergrid/blob/fe08fa72/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueSenderRouterProducer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueSenderRouterProducer.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueSenderRouterProducer.java
index 885a559..4fa9048 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueSenderRouterProducer.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueSenderRouterProducer.java
@@ -73,7 +73,7 @@ public class QueueSenderRouterProducer implements RouterProducer {
ClusterSingletonManagerSettings.create( system ).withRole( "io" );
system.actorOf( ClusterSingletonManager.props(
- Props.create( GuiceActorProducer.class, injector, QueueSenderRouter.class ),
+ Props.create( GuiceActorProducer.class, QueueSenderRouter.class ),
PoisonPill.getInstance(), settings ), "queueSenderRouter" );
ClusterSingletonProxySettings proxySettings =
http://git-wip-us.apache.org/repos/asf/usergrid/blob/fe08fa72/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueWriterRouterProducer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueWriterRouterProducer.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueWriterRouterProducer.java
index c8b5567..006f1a7 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueWriterRouterProducer.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueWriterRouterProducer.java
@@ -73,7 +73,7 @@ public class QueueWriterRouterProducer implements RouterProducer {
ClusterSingletonManagerSettings.create( system ).withRole( "io" );
system.actorOf( ClusterSingletonManager.props(
- Props.create( GuiceActorProducer.class, injector, QueueWriterRouter.class ),
+ Props.create( GuiceActorProducer.class, QueueWriterRouter.class ),
PoisonPill.getInstance(), settings ), "queueWriterRouter" );
ClusterSingletonProxySettings proxySettings =
http://git-wip-us.apache.org/repos/asf/usergrid/blob/fe08fa72/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
index 02862c4..d9a2543 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
@@ -36,6 +36,7 @@ import org.apache.usergrid.persistence.qakka.core.CassandraClient;
import org.apache.usergrid.persistence.qakka.core.QakkaUtils;
import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessageBody;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization;
import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardCounterSerialization;
@@ -59,6 +60,7 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization
private final ActorSystemFig actorSystemFig;
private final ShardStrategy shardStrategy;
private final ShardCounterSerialization shardCounterSerialization;
+ private final MessageCounterSerialization messageCounterSerialization;
public final static String COLUMN_QUEUE_NAME = "queue_name";
public final static String COLUMN_REGION = "region";
@@ -114,14 +116,16 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization
ActorSystemFig actorSystemFig,
ShardStrategy shardStrategy,
ShardCounterSerialization shardCounterSerialization,
+ MessageCounterSerialization messageCounterSerialization,
CassandraClient cassandraClient,
QakkaFig qakkaFig
) {
- this.cassandraConfig = cassandraConfig;
- this.actorSystemFig = actorSystemFig;
- this.shardStrategy = shardStrategy;
- this.shardCounterSerialization = shardCounterSerialization;
- this.cassandraClient = cassandraClient;
+ this.cassandraConfig = cassandraConfig;
+ this.actorSystemFig = actorSystemFig;
+ this.shardStrategy = shardStrategy;
+ this.shardCounterSerialization = shardCounterSerialization;
+ this.messageCounterSerialization = messageCounterSerialization;
+ this.cassandraClient = cassandraClient;
this.maxTtl = qakkaFig.getMaxTtlSeconds();
}
@@ -162,6 +166,8 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization
shardCounterSerialization.incrementCounter( message.getQueueName(), shardType, message.getShardId(), 1 );
+ messageCounterSerialization.incrementCounter( message.getQueueName(), message.getType(), 1L );
+
return queueMessageId;
}
@@ -250,6 +256,8 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization
.and(queueMessageIdClause);
cassandraClient.getQueueMessageSession().execute( delete );
+
+ messageCounterSerialization.decrementCounter( queueName, type, 1L );
}
[5/5] usergrid git commit: Ensure correct keyspace names used
Posted by sn...@apache.org.
Ensure correct keyspace names used
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/fbce160a
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/fbce160a
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/fbce160a
Branch: refs/heads/usergrid-1318-queue
Commit: fbce160a1a87418ee6eb09c6183a9db8f35aec8c
Parents: 202d5be
Author: Dave Johnson <sn...@apache.org>
Authored: Wed Oct 5 17:43:19 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Wed Oct 5 17:43:19 2016 -0400
----------------------------------------------------------------------
.../persistence/core/CassandraConfig.java | 28 +++
.../persistence/core/CassandraConfigImpl.java | 69 ++++++++
.../core/datastax/impl/DataStaxClusterImpl.java | 57 ++++---
.../core/astyanax/ColumnNameIteratorTest.java | 65 +++++++
.../MultiKeyColumnNameIteratorTest.java | 66 +++++++
.../astyanax/MultiRowColumnIteratorTest.java | 67 ++++++++
.../qakka/core/impl/QueueManagerImpl.java | 13 +-
.../qakka/api/QueueResourceTest.java | 170 ++++++++++---------
8 files changed, 421 insertions(+), 114 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/fbce160a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfig.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfig.java
index f45ad43..499561e 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfig.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfig.java
@@ -20,6 +20,8 @@ package org.apache.usergrid.persistence.core;
import com.netflix.astyanax.model.ConsistencyLevel;
+import org.apache.cassandra.db.marshal.AbstractCompositeType;
+import org.apache.log4j.lf5.viewer.categoryexplorer.CategoryPath;
/**
@@ -77,4 +79,30 @@ public interface CassandraConfig {
String getApplicationKeyspace();
String getApplicationLocalKeyspace();
+
+ String getLocalDataCenter();
+
+ int getConnections();
+
+ int getTimeout();
+
+ int getPoolTimeout();
+
+ String getClusterName();
+
+ String getHosts();
+
+ String getVersion();
+
+ String getUsername();
+
+ String getPassword();
+
+ String getStrategy();
+
+ String getStrategyOptions();
+
+ String getStrategyLocal();
+
+ String getStrategyOptionsLocal();
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/fbce160a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfigImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfigImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfigImpl.java
index 77f7228..9cdec95 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfigImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfigImpl.java
@@ -26,6 +26,7 @@ import java.beans.PropertyChangeListener;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.netflix.astyanax.model.ConsistencyLevel;
+import org.apache.log4j.lf5.viewer.categoryexplorer.CategoryPath;
/**
@@ -35,6 +36,7 @@ import com.netflix.astyanax.model.ConsistencyLevel;
@Singleton
public class CassandraConfigImpl implements CassandraConfig {
+ private CassandraFig cassandraFig;
private ConsistencyLevel readCl;
private ConsistencyLevel writeCl;
@@ -54,6 +56,8 @@ public class CassandraConfigImpl implements CassandraConfig {
@Inject
public CassandraConfigImpl( final CassandraFig cassandraFig ) {
+ this.cassandraFig = cassandraFig;
+
this.readCl = ConsistencyLevel.valueOf( cassandraFig.getAstyanaxReadCL() );
this.writeCl = ConsistencyLevel.valueOf( cassandraFig.getAstyanaxWriteCL() );
@@ -154,4 +158,69 @@ public class CassandraConfigImpl implements CassandraConfig {
return applicationLocalKeyspace;
}
+ @Override
+ public String getLocalDataCenter() {
+ return cassandraFig.getLocalDataCenter();
+ }
+
+ @Override
+ public int getConnections() {
+ return cassandraFig.getConnections();
+ }
+
+ @Override
+ public int getTimeout() {
+ return cassandraFig.getTimeout();
+ }
+
+ @Override
+ public int getPoolTimeout() {
+ return cassandraFig.getPoolTimeout();
+ }
+
+ @Override
+ public String getClusterName() {
+ return cassandraFig.getClusterName();
+ }
+
+ @Override
+ public String getHosts() {
+ return cassandraFig.getHosts();
+ }
+
+ @Override
+ public String getVersion() {
+ return cassandraFig.getVersion();
+ }
+
+ @Override
+ public String getUsername() {
+ return cassandraFig.getUsername();
+ }
+
+ @Override
+ public String getPassword() {
+ return cassandraFig.getPassword();
+ }
+
+ @Override
+ public String getStrategy() {
+ return cassandraFig.getStrategyLocal();
+ }
+
+ @Override
+ public String getStrategyOptions() {
+ return cassandraFig.getStrategyOptions();
+ }
+
+ @Override
+ public String getStrategyLocal() {
+ return cassandraFig.getStrategyLocal();
+ }
+
+ @Override
+ public String getStrategyOptionsLocal() {
+ return cassandraFig.getStrategyOptionsLocal();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/fbce160a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DataStaxClusterImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DataStaxClusterImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DataStaxClusterImpl.java
index c8ddf3e..67f6123 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DataStaxClusterImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DataStaxClusterImpl.java
@@ -23,6 +23,7 @@ import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
import com.datastax.driver.core.policies.LoadBalancingPolicy;
import com.google.inject.Inject;
import com.google.inject.Singleton;
+import org.apache.usergrid.persistence.core.CassandraConfig;
import org.apache.usergrid.persistence.core.CassandraFig;
import org.apache.usergrid.persistence.core.datastax.CQLUtils;
import org.apache.usergrid.persistence.core.datastax.DataStaxCluster;
@@ -36,15 +37,15 @@ public class DataStaxClusterImpl implements DataStaxCluster {
private static final Logger logger = LoggerFactory.getLogger( DataStaxClusterImpl.class );
- private final CassandraFig cassandraFig;
+ private final CassandraConfig cassandraConfig;
private Cluster cluster;
private Session applicationSession;
private Session queueMessageSession;
private Session clusterSession;
@Inject
- public DataStaxClusterImpl(final CassandraFig cassandraFig ) throws Exception {
- this.cassandraFig = cassandraFig;
+ public DataStaxClusterImpl(final CassandraConfig cassandraFig ) throws Exception {
+ this.cassandraConfig = cassandraFig;
this.cluster = buildCluster();
// always initialize the keyspaces
@@ -85,7 +86,7 @@ public class DataStaxClusterImpl implements DataStaxCluster {
// always grab cluster from getCluster() in case it was prematurely closed
if ( applicationSession == null || applicationSession.isClosed() ){
- applicationSession = getCluster().connect( CQLUtils.quote(cassandraFig.getApplicationKeyspace() ) );
+ applicationSession = getCluster().connect( CQLUtils.quote( cassandraConfig.getApplicationKeyspace() ) );
}
return applicationSession;
}
@@ -96,7 +97,7 @@ public class DataStaxClusterImpl implements DataStaxCluster {
// always grab cluster from getCluster() in case it was prematurely closed
if ( queueMessageSession == null || queueMessageSession.isClosed() ){
- queueMessageSession = getCluster().connect( CQLUtils.quote(cassandraFig.getApplicationLocalKeyspace() ) );
+ queueMessageSession = getCluster().connect( CQLUtils.quote( cassandraConfig.getApplicationLocalKeyspace() ) );
}
return queueMessageSession;
}
@@ -110,7 +111,7 @@ public class DataStaxClusterImpl implements DataStaxCluster {
public void createApplicationKeyspace() throws Exception {
boolean exists = getClusterSession().getCluster().getMetadata()
- .getKeyspace(CQLUtils.quote(cassandraFig.getApplicationKeyspace())) != null;
+ .getKeyspace(CQLUtils.quote( cassandraConfig.getApplicationKeyspace())) != null;
if(exists){
return;
@@ -118,14 +119,14 @@ public class DataStaxClusterImpl implements DataStaxCluster {
final String createApplicationKeyspace = String.format(
"CREATE KEYSPACE IF NOT EXISTS %s WITH replication = %s",
- CQLUtils.quote(cassandraFig.getApplicationKeyspace()),
- CQLUtils.getFormattedReplication(cassandraFig.getStrategy(), cassandraFig.getStrategyOptions())
+ CQLUtils.quote( cassandraConfig.getApplicationKeyspace()),
+ CQLUtils.getFormattedReplication( cassandraConfig.getStrategy(), cassandraConfig.getStrategyOptions())
);
getClusterSession().execute(createApplicationKeyspace);
- logger.info("Created keyspace: {}", cassandraFig.getApplicationKeyspace());
+ logger.info("Created keyspace: {}", cassandraConfig.getApplicationKeyspace());
}
@@ -139,7 +140,7 @@ public class DataStaxClusterImpl implements DataStaxCluster {
public void createApplicationLocalKeyspace() throws Exception {
boolean exists = getClusterSession().getCluster().getMetadata()
- .getKeyspace(CQLUtils.quote(cassandraFig.getApplicationLocalKeyspace())) != null;
+ .getKeyspace(CQLUtils.quote( cassandraConfig.getApplicationLocalKeyspace())) != null;
if (exists) {
return;
@@ -147,14 +148,14 @@ public class DataStaxClusterImpl implements DataStaxCluster {
final String createQueueMessageKeyspace = String.format(
"CREATE KEYSPACE IF NOT EXISTS %s WITH replication = %s",
- CQLUtils.quote(cassandraFig.getApplicationLocalKeyspace()),
- CQLUtils.getFormattedReplication(cassandraFig.getStrategyLocal(), cassandraFig.getStrategyOptionsLocal())
+ CQLUtils.quote( cassandraConfig.getApplicationLocalKeyspace()),
+ CQLUtils.getFormattedReplication( cassandraConfig.getStrategyLocal(), cassandraConfig.getStrategyOptionsLocal())
);
getClusterSession().execute(createQueueMessageKeyspace);
- logger.info("Created keyspace: {}", cassandraFig.getApplicationLocalKeyspace());
+ logger.info("Created keyspace: {}", cassandraConfig.getApplicationLocalKeyspace());
}
@@ -184,7 +185,7 @@ public class DataStaxClusterImpl implements DataStaxCluster {
ConsistencyLevel defaultConsistencyLevel;
try {
- defaultConsistencyLevel = ConsistencyLevel.valueOf(cassandraFig.getReadCl());
+ defaultConsistencyLevel = cassandraConfig.getDataStaxReadCl();
} catch (IllegalArgumentException e){
logger.error("Unable to parse provided consistency level in property: {}, defaulting to: {}",
@@ -196,44 +197,44 @@ public class DataStaxClusterImpl implements DataStaxCluster {
LoadBalancingPolicy loadBalancingPolicy;
- if( !cassandraFig.getLocalDataCenter().isEmpty() ){
+ if( !cassandraConfig.getLocalDataCenter().isEmpty() ){
loadBalancingPolicy = new DCAwareRoundRobinPolicy.Builder()
- .withLocalDc( cassandraFig.getLocalDataCenter() ).build();
+ .withLocalDc( cassandraConfig.getLocalDataCenter() ).build();
}else{
loadBalancingPolicy = new DCAwareRoundRobinPolicy.Builder().build();
}
final PoolingOptions poolingOptions = new PoolingOptions()
- .setCoreConnectionsPerHost(HostDistance.LOCAL, cassandraFig.getConnections() / 2)
- .setMaxConnectionsPerHost(HostDistance.LOCAL, cassandraFig.getConnections())
- .setIdleTimeoutSeconds(cassandraFig.getTimeout() / 1000)
- .setPoolTimeoutMillis(cassandraFig.getPoolTimeout());
+ .setCoreConnectionsPerHost(HostDistance.LOCAL, cassandraConfig.getConnections() / 2)
+ .setMaxConnectionsPerHost(HostDistance.LOCAL, cassandraConfig.getConnections())
+ .setIdleTimeoutSeconds( cassandraConfig.getTimeout() / 1000)
+ .setPoolTimeoutMillis( cassandraConfig.getPoolTimeout());
// purposely add a couple seconds to the driver's lower level socket timeouts vs. cassandra timeouts
final SocketOptions socketOptions = new SocketOptions()
- .setConnectTimeoutMillis(cassandraFig.getPoolTimeout() + 2000)
- .setReadTimeoutMillis(cassandraFig.getTimeout() + 2000);
+ .setConnectTimeoutMillis( cassandraConfig.getPoolTimeout() + 2000)
+ .setReadTimeoutMillis( cassandraConfig.getTimeout() + 2000);
final QueryOptions queryOptions = new QueryOptions()
.setConsistencyLevel(defaultConsistencyLevel);
Cluster.Builder datastaxCluster = Cluster.builder()
- .withClusterName(cassandraFig.getClusterName())
- .addContactPoints(cassandraFig.getHosts().split(","))
+ .withClusterName( cassandraConfig.getClusterName())
+ .addContactPoints( cassandraConfig.getHosts().split(","))
.withMaxSchemaAgreementWaitSeconds(30)
.withCompression(ProtocolOptions.Compression.LZ4)
.withLoadBalancingPolicy(loadBalancingPolicy)
.withPoolingOptions(poolingOptions)
.withQueryOptions(queryOptions)
.withSocketOptions(socketOptions)
- .withProtocolVersion(getProtocolVersion(cassandraFig.getVersion()));
+ .withProtocolVersion(getProtocolVersion( cassandraConfig.getVersion()));
// only add auth credentials if they were provided
- if ( !cassandraFig.getUsername().isEmpty() && !cassandraFig.getPassword().isEmpty() ){
+ if ( !cassandraConfig.getUsername().isEmpty() && !cassandraConfig.getPassword().isEmpty() ){
datastaxCluster.withCredentials(
- cassandraFig.getUsername(),
- cassandraFig.getPassword()
+ cassandraConfig.getUsername(),
+ cassandraConfig.getPassword()
);
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/fbce160a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIteratorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIteratorTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIteratorTest.java
index c45fdd1..00856a5 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIteratorTest.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIteratorTest.java
@@ -126,6 +126,71 @@ public class ColumnNameIteratorTest {
return cassandraFig.getApplicationLocalKeyspace();
}
+ @Override
+ public String getLocalDataCenter() {
+ return cassandraFig.getLocalDataCenter();
+ }
+
+ @Override
+ public int getConnections() {
+ return cassandraFig.getConnections();
+ }
+
+ @Override
+ public int getTimeout() {
+ return cassandraFig.getTimeout();
+ }
+
+ @Override
+ public int getPoolTimeout() {
+ return cassandraFig.getPoolTimeout();
+ }
+
+ @Override
+ public String getClusterName() {
+ return cassandraFig.getClusterName();
+ }
+
+ @Override
+ public String getHosts() {
+ return cassandraFig.getHosts();
+ }
+
+ @Override
+ public String getVersion() {
+ return cassandraFig.getVersion();
+ }
+
+ @Override
+ public String getUsername() {
+ return cassandraFig.getUsername();
+ }
+
+ @Override
+ public String getPassword() {
+ return cassandraFig.getPassword();
+ }
+
+ @Override
+ public String getStrategy() {
+ return cassandraFig.getStrategy();
+ }
+
+ @Override
+ public String getStrategyOptions() {
+ return cassandraFig.getStrategyOptions();
+ }
+
+ @Override
+ public String getStrategyLocal() {
+ return cassandraFig.getStrategyLocal();
+ }
+
+ @Override
+ public String getStrategyOptionsLocal() {
+ return cassandraFig.getStrategyOptionsLocal();
+ }
+
};
http://git-wip-us.apache.org/repos/asf/usergrid/blob/fbce160a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java
index 5cdf0e1..b12ea6f 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java
@@ -129,6 +129,72 @@ public class MultiKeyColumnNameIteratorTest {
return cassandraFig.getApplicationLocalKeyspace();
}
+
+ @Override
+ public String getLocalDataCenter() {
+ return cassandraFig.getLocalDataCenter();
+ }
+
+ @Override
+ public int getConnections() {
+ return cassandraFig.getConnections();
+ }
+
+ @Override
+ public int getTimeout() {
+ return cassandraFig.getTimeout();
+ }
+
+ @Override
+ public int getPoolTimeout() {
+ return cassandraFig.getPoolTimeout();
+ }
+
+ @Override
+ public String getClusterName() {
+ return cassandraFig.getClusterName();
+ }
+
+ @Override
+ public String getHosts() {
+ return cassandraFig.getHosts();
+ }
+
+ @Override
+ public String getVersion() {
+ return cassandraFig.getVersion();
+ }
+
+ @Override
+ public String getUsername() {
+ return cassandraFig.getUsername();
+ }
+
+ @Override
+ public String getPassword() {
+ return cassandraFig.getPassword();
+ }
+
+ @Override
+ public String getStrategy() {
+ return cassandraFig.getStrategy();
+ }
+
+ @Override
+ public String getStrategyOptions() {
+ return cassandraFig.getStrategyOptions();
+ }
+
+ @Override
+ public String getStrategyLocal() {
+ return cassandraFig.getStrategyLocal();
+ }
+
+ @Override
+ public String getStrategyOptionsLocal() {
+ return cassandraFig.getStrategyOptionsLocal();
+ }
+
};
http://git-wip-us.apache.org/repos/asf/usergrid/blob/fbce160a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java
index 6331941..e509c45 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java
@@ -132,6 +132,73 @@ public class MultiRowColumnIteratorTest {
public String getApplicationLocalKeyspace() {
return cassandraFig.getApplicationLocalKeyspace();
}
+
+
+ @Override
+ public String getLocalDataCenter() {
+ return cassandraFig.getLocalDataCenter();
+ }
+
+ @Override
+ public int getConnections() {
+ return cassandraFig.getConnections();
+ }
+
+ @Override
+ public int getTimeout() {
+ return cassandraFig.getTimeout();
+ }
+
+ @Override
+ public int getPoolTimeout() {
+ return cassandraFig.getPoolTimeout();
+ }
+
+ @Override
+ public String getClusterName() {
+ return cassandraFig.getClusterName();
+ }
+
+ @Override
+ public String getHosts() {
+ return cassandraFig.getHosts();
+ }
+
+ @Override
+ public String getVersion() {
+ return cassandraFig.getVersion();
+ }
+
+ @Override
+ public String getUsername() {
+ return cassandraFig.getUsername();
+ }
+
+ @Override
+ public String getPassword() {
+ return cassandraFig.getPassword();
+ }
+
+ @Override
+ public String getStrategy() {
+ return cassandraFig.getStrategy();
+ }
+
+ @Override
+ public String getStrategyOptions() {
+ return cassandraFig.getStrategyOptions();
+ }
+
+ @Override
+ public String getStrategyLocal() {
+ return cassandraFig.getStrategyLocal();
+ }
+
+ @Override
+ public String getStrategyOptionsLocal() {
+ return cassandraFig.getStrategyOptionsLocal();
+ }
+
};
http://git-wip-us.apache.org/repos/asf/usergrid/blob/fbce160a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java
index a8139a1..d51fe2d 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java
@@ -81,14 +81,13 @@ public class QueueManagerImpl implements QueueManager {
}
}
- for ( String region : regions ) {
+ Shard available = new Shard( queue.getName(), actorSystemFig.getRegionLocal(),
+ Shard.Type.DEFAULT, 1L, QakkaUtils.getTimeUuid());
+ shardSerialization.createShard( available );
- Shard available = new Shard( queue.getName(), region, Shard.Type.DEFAULT, 1L, QakkaUtils.getTimeUuid());
- shardSerialization.createShard( available );
-
- Shard inflight = new Shard( queue.getName(), region, Shard.Type.INFLIGHT, 1L, QakkaUtils.getTimeUuid());
- shardSerialization.createShard( inflight );
- }
+ Shard inflight = new Shard( queue.getName(), actorSystemFig.getRegionLocal(),
+ Shard.Type.INFLIGHT, 1L, QakkaUtils.getTimeUuid());
+ shardSerialization.createShard( inflight );
// only write the existence of a queue to the database if its dependent initial shards have been written
queueSerialization.writeQueue(queue.toDatabaseQueue());
http://git-wip-us.apache.org/repos/asf/usergrid/blob/fbce160a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/QueueResourceTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/QueueResourceTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/QueueResourceTest.java
index d723e97..620e946 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/QueueResourceTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/QueueResourceTest.java
@@ -155,39 +155,43 @@ public class QueueResourceTest extends AbstractRestTest {
}};
target( "queues" ).request().post( Entity.entity( queueMap, MediaType.APPLICATION_JSON_TYPE ) );
- // send some messages
+ try {
- ObjectMapper mapper = new ObjectMapper();
+ // send some messages
+
+ ObjectMapper mapper = new ObjectMapper();
- int numMessages = 100;
- for (int i = 0; i < numMessages; i++) {
+ int numMessages = 100;
+ for (int i = 0; i < numMessages; i++) {
- final int number = i;
- Map<String, Object> messageMap = new HashMap<String, Object>() {{
- put( "message", "this is message #" + number );
- put( "valid", true );
- }};
- String body = mapper.writeValueAsString( messageMap );
+ final int number = i;
+ Map<String, Object> messageMap = new HashMap<String, Object>() {{
+ put( "message", "this is message #" + number );
+ put( "valid", true );
+ }};
+ String body = mapper.writeValueAsString( messageMap );
- Response response;
- if ( asJson ) {
- response = target( "queues" ).path( queueName ).path( "messages" )
+ Response response;
+ if (asJson) {
+ response = target( "queues" ).path( queueName ).path( "messages" )
.request().post( Entity.entity( body, MediaType.APPLICATION_JSON ) );
- } else {
- response = target( "queues" ).path( queueName ).path( "messages" )
+ } else {
+ response = target( "queues" ).path( queueName ).path( "messages" )
.queryParam( "contentType", MediaType.APPLICATION_JSON )
.request().post( Entity.entity( body, MediaType.APPLICATION_OCTET_STREAM ) );
- }
+ }
- Assert.assertEquals( 200, response.getStatus() );
- }
+ Assert.assertEquals( 200, response.getStatus() );
+ }
- // get all messages, checking for dups
+ // get all messages, checking for dups
- checkJsonMessages( queueName, numMessages );
+ checkJsonMessages( queueName, numMessages );
- Response response = target( "queues" ).path( queueName ).queryParam( "confirm", true ).request().delete();
- Assert.assertEquals( 200, response.getStatus() );
+ } finally {
+ Response response = target( "queues" ).path( queueName ).queryParam( "confirm", true ).request().delete();
+ Assert.assertEquals( 200, response.getStatus() );
+ }
}
@@ -244,28 +248,32 @@ public class QueueResourceTest extends AbstractRestTest {
}};
target( "queues" ).request().post( Entity.entity( queueMap, MediaType.APPLICATION_JSON_TYPE ) );
- // send messages each with image/jpg payload
+ try {
- InputStream is = getClass().getResourceAsStream("/qakka-duck.jpg");
- byte[] bytes = ByteStreams.toByteArray( is );
+ // send messages each with image/jpg payload
- int numMessages = 100;
- for (int i = 0; i < numMessages; i++) {
+ InputStream is = getClass().getResourceAsStream( "/qakka-duck.jpg" );
+ byte[] bytes = ByteStreams.toByteArray( is );
- Response response = target( "queues" ).path( queueName ).path( "messages" )
+ int numMessages = 100;
+ for (int i = 0; i < numMessages; i++) {
+
+ Response response = target( "queues" ).path( queueName ).path( "messages" )
.queryParam( "contentType", "image/jpg" )
.request()
- .post( Entity.entity( bytes, MediaType.APPLICATION_OCTET_STREAM ));
+ .post( Entity.entity( bytes, MediaType.APPLICATION_OCTET_STREAM ) );
- Assert.assertEquals( 200, response.getStatus() );
- }
+ Assert.assertEquals( 200, response.getStatus() );
+ }
- // get all messages, checking for dups
+ // get all messages, checking for dups
- checkBinaryMessages( queueName, numMessages );
+ checkBinaryMessages( queueName, numMessages );
- Response response = target( "queues" ).path( queueName ).queryParam( "confirm", true ).request().delete();
- Assert.assertEquals( 200, response.getStatus() );
+ } finally {
+ Response response = target( "queues" ).path( queueName ).queryParam( "confirm", true ).request().delete();
+ Assert.assertEquals( 200, response.getStatus() );
+ }
}
@@ -320,71 +328,75 @@ public class QueueResourceTest extends AbstractRestTest {
Map<String, Object> queueMap = new HashMap<String, Object>() {{ put("name", queueName); }};
target("queues").request().post( Entity.entity( queueMap, MediaType.APPLICATION_JSON_TYPE));
- // send some messages
+ try {
- ObjectMapper mapper = new ObjectMapper();
+ // send some messages
- int numMessages = 100;
- for ( int i=0; i<numMessages; i++ ) {
+ ObjectMapper mapper = new ObjectMapper();
- final int number = i;
- Map<String, Object> messageMap = new HashMap<String, Object>() {{
- put("message", "this is message #" + number);
- put("valid", true );
- }};
- String body = mapper.writeValueAsString( messageMap );
+ int numMessages = 100;
+ for (int i = 0; i < numMessages; i++) {
- Response response = target("queues").path( queueName ).path( "messages" )
- .request().post( Entity.entity( body, MediaType.APPLICATION_JSON ));
+ final int number = i;
+ Map<String, Object> messageMap = new HashMap<String, Object>() {{
+ put( "message", "this is message #" + number );
+ put( "valid", true );
+ }};
+ String body = mapper.writeValueAsString( messageMap );
- Assert.assertEquals( 200, response.getStatus() );
- }
+ Response response = target( "queues" ).path( queueName ).path( "messages" )
+ .request().post( Entity.entity( body, MediaType.APPLICATION_JSON ) );
- // get all messages, checking for dups
+ Assert.assertEquals( 200, response.getStatus() );
+ }
- Set<UUID> messageIds = checkJsonMessages( queueName, numMessages );
+ // get all messages, checking for dups
- // there should be no more messages available
+ Set<UUID> messageIds = checkJsonMessages( queueName, numMessages );
- Response response = target( "queues" ).path( queueName ).path( "messages" ).request().get();
- ApiResponse apiResponse = response.readEntity( ApiResponse.class );
- Assert.assertNotNull( apiResponse.getQueueMessages() );
- Assert.assertTrue( apiResponse.getQueueMessages().isEmpty() );
+ // there should be no more messages available
+
+ Response response = target( "queues" ).path( queueName ).path( "messages" ).request().get();
+ ApiResponse apiResponse = response.readEntity( ApiResponse.class );
+ Assert.assertNotNull( apiResponse.getQueueMessages() );
+ Assert.assertTrue( apiResponse.getQueueMessages().isEmpty() );
- // ack half of the messages
+ // ack half of the messages
- int count = 0;
- Set<UUID> ackedIds = new HashSet<>();
- for ( UUID queueMessageId : messageIds ) {
- response = target( "queues" )
+ int count = 0;
+ Set<UUID> ackedIds = new HashSet<>();
+ for (UUID queueMessageId : messageIds) {
+ response = target( "queues" )
.path( queueName ).path( "messages" ).path( queueMessageId.toString() ).request().delete();
- Assert.assertEquals( 200, response.getStatus() );
- ackedIds.add( queueMessageId );
- if ( ++count >= numMessages/2 ) {
- break;
+ Assert.assertEquals( 200, response.getStatus() );
+ ackedIds.add( queueMessageId );
+ if (++count >= numMessages / 2) {
+ break;
+ }
}
- }
- messageIds.removeAll( ackedIds );
+ messageIds.removeAll( ackedIds );
- // wait for remaining of the messages to timeout
+ // wait for remaining of the messages to timeout
- QakkaFig qakkaFig = StartupListener.INJECTOR.getInstance( QakkaFig.class );
- Thread.sleep( 2*qakkaFig.getQueueTimeoutSeconds() * 1000 );
+ QakkaFig qakkaFig = StartupListener.INJECTOR.getInstance( QakkaFig.class );
+ Thread.sleep( 2 * qakkaFig.getQueueTimeoutSeconds() * 1000 );
- // now, the remaining messages cannot be acked because they timed out
+ // now, the remaining messages cannot be acked because they timed out
- for ( UUID queueMessageId : messageIds ) {
- response = target( "queues" )
+ for (UUID queueMessageId : messageIds) {
+ response = target( "queues" )
.path( queueName ).path( "messages" ).path( queueMessageId.toString() ).request().delete();
- Assert.assertEquals( 400, response.getStatus() );
- }
+ Assert.assertEquals( 400, response.getStatus() );
+ }
- // and, those same messages should be available again in the queue
+ // and, those same messages should be available again in the queue
- checkJsonMessages( queueName, numMessages/2 );
+ checkJsonMessages( queueName, numMessages / 2 );
- response = target( "queues" ).path( queueName ).queryParam( "confirm", true ).request().delete();
- Assert.assertEquals( 200, response.getStatus() );
+ } finally {
+ Response response = target( "queues" ).path( queueName ).queryParam( "confirm", true ).request().delete();
+ Assert.assertEquals( 200, response.getStatus() );
+ }
}
[2/5] usergrid git commit: Ensure that injector is not used in Akka
constructors to avoid "cannot serialize injector" errors
Posted by sn...@apache.org.
Ensure that injector is not used in Akka constructors to avoid "cannot serialize injector" errors
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/d2153285
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/d2153285
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/d2153285
Branch: refs/heads/usergrid-1318-queue
Commit: d2153285dae4c9257f3b62ab533186f31f770926
Parents: fe08fa7
Author: Dave Johnson <sn...@apache.org>
Authored: Wed Oct 5 17:41:26 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Wed Oct 5 17:41:26 2016 -0400
----------------------------------------------------------------------
.../persistence/actorsystem/GuiceActorProducer.java | 8 ++++----
.../uniquevalues/UniqueValuesServiceImpl.java | 2 +-
.../distributed/actors/QueueTimeouterTest.java | 16 +++++++++-------
.../distributed/actors/ShardAllocatorTest.java | 4 ++--
4 files changed, 16 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d2153285/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/GuiceActorProducer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/GuiceActorProducer.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/GuiceActorProducer.java
index 9304c4c..d73f760 100644
--- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/GuiceActorProducer.java
+++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/GuiceActorProducer.java
@@ -25,11 +25,11 @@ import com.google.inject.Injector;
public class GuiceActorProducer implements IndirectActorProducer {
- final Injector injector;
+ public static Injector INJECTOR = null;
+
final Class<? extends Actor> actorClass;
- public GuiceActorProducer(Injector injector, Class<? extends Actor> actorClass) {
- this.injector = injector;
+ public GuiceActorProducer(Class<? extends Actor> actorClass) {
this.actorClass = actorClass;
}
@@ -40,7 +40,7 @@ public class GuiceActorProducer implements IndirectActorProducer {
@Override
public Actor produce() {
- return injector.getInstance( actorClass );
+ return INJECTOR.getInstance( actorClass );
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d2153285/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java
index 8bdb02c..08de853 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java
@@ -292,7 +292,7 @@ public class UniqueValuesServiceImpl implements UniqueValuesService {
ClusterSingletonManagerSettings.create( system ).withRole("io");
system.actorOf( ClusterSingletonManager.props(
- Props.create( GuiceActorProducer.class, injector, UniqueValuesRouter.class ),
+ Props.create( GuiceActorProducer.class, UniqueValuesRouter.class ),
PoisonPill.getInstance(), settings ), "uvRouter" );
ClusterSingletonProxySettings proxySettings =
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d2153285/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouterTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouterTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouterTest.java
index 3079773..3bf87a6 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouterTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouterTest.java
@@ -31,6 +31,7 @@ import org.apache.usergrid.persistence.qakka.App;
import org.apache.usergrid.persistence.qakka.QakkaFig;
import org.apache.usergrid.persistence.qakka.core.CassandraClient;
import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
+import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
import org.apache.usergrid.persistence.qakka.distributed.messages.QueueTimeoutRequest;
import org.apache.usergrid.persistence.qakka.serialization.MultiShardMessageIterator;
import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
@@ -58,14 +59,15 @@ public class QueueTimeouterTest extends AbstractTest {
@Test
public void testBasicOperation() throws Exception {
- CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
-
Injector injector = getInjector();
- QakkaFig qakkaFig = getInjector().getInstance( QakkaFig.class );
- ActorSystemFig actorSystemFig = getInjector().getInstance( ActorSystemFig.class );
- QueueMessageSerialization qms = getInjector().getInstance( QueueMessageSerialization.class );
- ShardSerialization shardSerialization = getInjector().getInstance( ShardSerialization.class );
+ injector.getInstance( DistributedQueueService.class ); // init the INJECTOR
+
+ CassandraClient cassandraClient = injector.getInstance( CassandraClientImpl.class );
+ QakkaFig qakkaFig = injector.getInstance( QakkaFig.class );
+ ActorSystemFig actorSystemFig = injector.getInstance( ActorSystemFig.class );
+ QueueMessageSerialization qms = injector.getInstance( QueueMessageSerialization.class );
+ ShardSerialization shardSerialization = injector.getInstance( ShardSerialization.class );
// create records in inflight table, with some being old enough to time out
@@ -113,7 +115,7 @@ public class QueueTimeouterTest extends AbstractTest {
ActorSystem system = ActorSystem.create("Test-" + queueName);
ActorRef timeouterRef = system.actorOf( Props.create(
- GuiceActorProducer.class, injector, QueueTimeouter.class), "timeouter");
+ GuiceActorProducer.class, QueueTimeouter.class), "timeouter");
QueueTimeoutRequest qtr = new QueueTimeoutRequest( queueName );
timeouterRef.tell( qtr, null ); // tell sends message, returns immediately
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d2153285/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
index c6831b7..919673c 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
@@ -69,7 +69,7 @@ public class ShardAllocatorTest extends AbstractTest {
CassandraClient cassandraClient = injector.getInstance( CassandraClientImpl.class );
- injector.getInstance( App.class ); // init the INJECTOR
+ injector.getInstance( DistributedQueueService.class ); // init the INJECTOR
ShardSerialization shardSer = injector.getInstance( ShardSerialization.class );
QakkaFig qakkaFig = injector.getInstance( QakkaFig.class );
@@ -113,7 +113,7 @@ public class ShardAllocatorTest extends AbstractTest {
ActorSystem system = ActorSystem.create("Test-" + queueName);
ActorRef shardAllocRef = system.actorOf( Props.create(
- GuiceActorProducer.class, injector, ShardAllocator.class), "shardallocator");
+ GuiceActorProducer.class, ShardAllocator.class), "shardallocator");
ShardCheckRequest checkRequest = new ShardCheckRequest( queueName );
shardAllocRef.tell( checkRequest, null ); // tell sends message, returns immediately
[3/5] usergrid git commit: Logging improvements
Posted by sn...@apache.org.
Logging improvements
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/e3285843
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/e3285843
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/e3285843
Branch: refs/heads/usergrid-1318-queue
Commit: e3285843260c361b3126c5d235068d38802ff164
Parents: d215328
Author: Dave Johnson <sn...@apache.org>
Authored: Wed Oct 5 17:42:21 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Wed Oct 5 17:42:21 2016 -0400
----------------------------------------------------------------------
.../persistence/qakka/MetricsService.java | 20 ++++-----
.../qakka/core/impl/InMemoryQueue.java | 2 +-
.../distributed/actors/QueueActorHelper.java | 37 +----------------
.../queue/LegacyQueueManagerTest.java | 43 ++++++++++----------
4 files changed, 33 insertions(+), 69 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/e3285843/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/MetricsService.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/MetricsService.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/MetricsService.java
index 378ba0d..b032be6 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/MetricsService.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/MetricsService.java
@@ -24,16 +24,16 @@ import com.codahale.metrics.MetricRegistry;
public interface MetricsService {
- String SEND_TIME_TOTAL = "org.apache.usergrid.persistence.qakka.send.time.total";
- String SEND_TIME_SEND = "org.apache.usergrid.persistence.qakka.send.time.send";
- String SEND_TIME_WRITE = "org.apache.usergrid.persistence.qakka.send.time.write";
- String GET_TIME_TOTAL = "org.apache.usergrid.persistence.qakka.get.time.total";
- String GET_TIME_GET = "org.apache.usergrid.persistence.qakka.get.time.get";
- String ACK_TIME_TOTAL = "org.apache.usergrid.persistence.qakka.ack.time.total";
- String ACK_TIME_ACK = "org.apache.usergrid.persistence.qakka.ack.time.ack";
- String TIMEOUT_TIME = "org.apache.usergrid.persistence.qakka.timeout.time";
- String REFRESH_TIME = "org.apache.usergrid.persistence.qakka.timeout.time";
- String ALLOCATE_TIME = "org.apache.usergrid.persistence.qakka.allocate.time";
+ String SEND_TIME_TOTAL = "qakka.send.time.total";
+ String SEND_TIME_SEND = "qakka.send.time.send";
+ String SEND_TIME_WRITE = "qakka.send.time.write";
+ String GET_TIME_TOTAL = "qakka.get.time.total";
+ String GET_TIME_GET = "qakka.get.time.get";
+ String ACK_TIME_TOTAL = "qakka.ack.time.total";
+ String ACK_TIME_ACK = "qakka.ack.time.ack";
+ String TIMEOUT_TIME = "qakka.timeout.time";
+ String REFRESH_TIME = "qakka.refresh.time";
+ String ALLOCATE_TIME = "qakka.allocate.time";
MetricRegistry getMetricRegistry();
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/e3285843/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
index e70ec75..6a26483 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
@@ -65,7 +65,7 @@ public class InMemoryQueue {
} else {
if ( databaseQueueMessage.getQueueMessageId().timestamp() > newest.timestamp() ) {
newest = databaseQueueMessage.getQueueMessageId();
- logger.debug("New newest for queue {} is {}", queueName, newest.timestamp());
+ //logger.debug("New newest for queue {} is {}", queueName, newest.timestamp());
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/e3285843/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
index 0b31e16..e3996c5 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
@@ -119,7 +119,7 @@ public class QueueActorHelper {
}
}
- //logger.debug("{} returning {} for queue {}", this, queueMessages.size(), queueName);
+ logger.debug("{} returning {} for queue {}", this, queueMessages.size(), queueName);
return queueMessages;
}
@@ -251,12 +251,6 @@ public class QueueActorHelper {
UUID since = inMemoryQueue.getNewest( queueName );
-// if ( since != null ) {
-// logger.debug( "Loading queue {} messages newer than {}", queueName, since.timestamp() );
-// } else {
-// logger.debug( "Loading queue {} messages newer than [null]", queueName );
-// }
-
String region = actorSystemFig.getRegionLocal();
MultiShardMessageIterator multiShardIterator = new MultiShardMessageIterator(
cassandraClient, queueName, region, DatabaseQueueMessage.Type.DEFAULT,
@@ -271,35 +265,6 @@ public class QueueActorHelper {
count++;
}
-// long runs = runCount.incrementAndGet();
-// long readCount = totalRead.addAndGet( count );
-//
-// if ( logger.isDebugEnabled() && runs % 100 == 0 ) {
-//
-// final DecimalFormat format = new DecimalFormat("##.###");
-// final long nano = 1000000000;
-// Timer t = metricsService.getMetricRegistry().timer( MetricsService.REFRESH_TIME );
-//
-// logger.debug("QueueRefresher for queue '{}' stats:\n" +
-// " Num runs={}\n" +
-// " Read count={}\n" +
-// " Mean={}\n" +
-// " One min rate={}\n" +
-// " Five min rate={}\n" +
-// " Snapshot mean={}\n" +
-// " Snapshot min={}\n" +
-// " Snapshot max={}",
-// queueName,
-// t.getCount(),
-// readCount,
-// format.format( t.getMeanRate() ),
-// format.format( t.getOneMinuteRate() ),
-// format.format( t.getFiveMinuteRate() ),
-// format.format( t.getSnapshot().getMean() / nano ),
-// format.format( (double) t.getSnapshot().getMin() / nano ),
-// format.format( (double) t.getSnapshot().getMax() / nano ) );
-// }
-
if ( count > 0 ) {
logger.debug( "Added {} in-memory for queue {}, new size = {}",
count, queueName, inMemoryQueue.size( queueName ) );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/e3285843/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java
index 65c3309..92075b6 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java
@@ -29,7 +29,6 @@ import org.apache.usergrid.persistence.qakka.core.CassandraClient;
import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
import org.apache.usergrid.persistence.queue.impl.LegacyQueueScopeImpl;
-import org.junit.Ignore;
import org.junit.Test;
import java.util.ArrayList;
@@ -143,41 +142,41 @@ public class LegacyQueueManagerTest extends AbstractTest {
final LegacyQueueScopeImpl scope =
new LegacyQueueScopeImpl( "testQueue" + queueSeed++, LegacyQueueScope.RegionImplementation.LOCAL );
LegacyQueueManagerFactory qmf = myInjector.getInstance( LegacyQueueManagerFactory.class );
- LegacyQueueManager qm = qmf.getQueueManager(scope);
+ LegacyQueueManager qm = qmf.getQueueManager( scope );
- HashMap<String,String> values = new HashMap<>();
- values.put("test", "Test");
+ HashMap<String, String> values = new HashMap<>();
+ values.put( "test", "Test" );
- List<Map<String,String>> bodies = new ArrayList<>();
- bodies.add(values);
+ List<Map<String, String>> bodies = new ArrayList<>();
+ bodies.add( values );
long initialDepth = qm.getQueueDepth();
- qm.sendMessages(bodies);
+ qm.sendMessages( bodies );
long depth = 0;
- for(int i=0; i<10;i++){
- depth = qm.getQueueDepth();
- if(depth>0){
+ for (int i = 0; i < 10; i++) {
+ depth = qm.getQueueDepth();
+ if (depth > 0) {
break;
}
- Thread.sleep(1000);
+ Thread.sleep( 1000 );
}
- assertTrue(depth>0);
+ assertTrue( depth > 0 );
- List<LegacyQueueMessage> messageList = qm.getMessages(10, values.getClass());
- assertTrue(messageList.size() <= 500);
- for(LegacyQueueMessage message : messageList){
- assertTrue(message.getBody().equals(values));
+ List<LegacyQueueMessage> messageList = qm.getMessages( 10, values.getClass() );
+ assertTrue( messageList.size() <= 500 );
+ for (LegacyQueueMessage message : messageList) {
+ assertTrue( message.getBody().equals( values ) );
}
- if(messageList.size()>0) {
- qm.commitMessages(messageList);
+ if (messageList.size() > 0) {
+ qm.commitMessages( messageList );
}
- for(int i=0; i<10;i++){
+ for (int i = 0; i < 10; i++) {
depth = qm.getQueueDepth();
- if(depth==initialDepth){
+ if (depth == initialDepth) {
break;
}
- Thread.sleep(1000);
+ Thread.sleep( 1000 );
}
- assertEquals(initialDepth, depth);
+ assertEquals( initialDepth, depth );
DistributedQueueService distributedQueueService = myInjector.getInstance( DistributedQueueService.class );
distributedQueueService.shutdown();
[4/5] usergrid git commit: Queue info REST end-point
Posted by sn...@apache.org.
Queue info REST end-point
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/202d5bed
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/202d5bed
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/202d5bed
Branch: refs/heads/usergrid-1318-queue
Commit: 202d5beda527e54852c86439c9c65d6ab2157ad8
Parents: e328584
Author: Dave Johnson <sn...@apache.org>
Authored: Wed Oct 5 17:42:42 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Wed Oct 5 17:42:42 2016 -0400
----------------------------------------------------------------------
.../rest/system/QueueSystemResource.java | 78 ++++++++++++++++++++
1 file changed, 78 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/202d5bed/stack/rest/src/main/java/org/apache/usergrid/rest/system/QueueSystemResource.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/system/QueueSystemResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/system/QueueSystemResource.java
index 15bdb34..86cd387 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/system/QueueSystemResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/system/QueueSystemResource.java
@@ -19,9 +19,18 @@
*/
package org.apache.usergrid.rest.system;
+import com.codahale.metrics.Timer;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
+import org.apache.usergrid.persistence.qakka.MetricsService;
+import org.apache.usergrid.persistence.qakka.QakkaFig;
+import org.apache.usergrid.persistence.qakka.core.Queue;
+import org.apache.usergrid.persistence.qakka.core.QueueManager;
+import org.apache.usergrid.persistence.qakka.core.QueueMessageManager;
+import org.apache.usergrid.persistence.qakka.core.impl.InMemoryQueue;
+import org.apache.usergrid.persistence.qakka.core.impl.QueueManagerImpl;
+import org.apache.usergrid.persistence.qakka.core.impl.QueueMessageManagerImpl;
import org.apache.usergrid.rest.AbstractContextResource;
import org.apache.usergrid.rest.ApiResponse;
import org.apache.usergrid.rest.security.annotations.RequireSystemAccess;
@@ -32,6 +41,10 @@ import org.springframework.stereotype.Component;
import javax.ws.rs.*;
import javax.ws.rs.core.MediaType;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.text.DecimalFormat;
+import java.util.*;
/**
* retrieves queue stats
@@ -47,6 +60,7 @@ public class QueueSystemResource extends AbstractContextResource {
public QueueSystemResource(){logger.info("queue resource initialized");}
+
/**
* Return queue depth of this Usergrid instance in JSON format.
*
@@ -74,4 +88,68 @@ public class QueueSystemResource extends AbstractContextResource {
return response;
}
+
+ @GET
+ @RequireSystemAccess
+ @Path("info")
+ public ApiResponse getQueueInfo(
+ @QueryParam("callback") @DefaultValue("callback") String callback ) {
+
+ ApiResponse response = createApiResponse();
+ response.setAction( "get queue info" );
+
+ MetricsService metricsService = injector.getInstance( MetricsService.class );
+
+ final DecimalFormat format = new DecimalFormat("##.###");
+ final long nano = 1000000000;
+
+ Map<String, Object> info = new HashMap<String, Object>() {{
+ put( "name", "Queue Info" );
+ try {
+ put( "host", InetAddress.getLocalHost().getHostName() );
+ } catch (UnknownHostException e) {
+ put( "host", "unknown" );
+ }
+ SortedSet<String> names = metricsService.getMetricRegistry().getNames();
+ for (String name : names) {
+ Timer t = metricsService.getMetricRegistry().timer( name );
+ put( name, new HashMap<String, Object>() {{
+ put( "count", "" + t.getCount() );
+ put( "mean_rate", "" + format.format( t.getMeanRate() ) );
+ put( "one_minute_rate", "" + format.format( t.getOneMinuteRate() ) );
+ put( "five_minute_rate", "" + format.format( t.getFiveMinuteRate() ) );
+ put( "mean (s)", "" + format.format( t.getSnapshot().getMean() / nano ) );
+ put( "min (s)", "" + format.format( (double) t.getSnapshot().getMin() / nano ) );
+ put( "max (s)", "" + format.format( (double) t.getSnapshot().getMax() / nano ) );
+ }} );
+ }
+ }};
+
+ QueueManager queueManager = injector.getInstance( QueueManagerImpl.class );
+ QueueMessageManager queueMessageManager = injector.getInstance( QueueMessageManagerImpl.class );
+ InMemoryQueue inMemoryQueue = injector.getInstance( InMemoryQueue.class );
+
+ List queues = new ArrayList();
+ final List<String> listOfQueues = queueManager.getListOfQueues();
+ for ( String queueName : listOfQueues ) {
+
+ Map<String, Object> queueInfo = new HashMap<>();
+
+ queueInfo.put("name", queueName );
+ queueInfo.put("depth", queueMessageManager.getQueueDepth( queueName ));
+ queueInfo.put("inmemory", inMemoryQueue.size( queueName ));
+
+ UUID newest = inMemoryQueue.getNewest( queueName );
+ queueInfo.put("since", newest == null ? "null" : newest.timestamp());
+
+ queues.add( queueInfo );
+ }
+
+ info.put("queues", queues);
+
+ response.setProperty( "data", info );
+
+ return response;
+ }
+
}