You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2016/09/16 22:34:40 UTC
[11/25] usergrid git commit: Initial integration of Qakka into
Usergrid Queue module,
and implementation of Qakka-based LegacyQueueManager implementation.
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
new file mode 100644
index 0000000..fcd2161
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.distributed.actors;
+
+import akka.actor.UntypedActor;
+import com.codahale.metrics.Timer;
+import com.google.inject.Injector;
+import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
+import org.apache.usergrid.persistence.qakka.App;
+import org.apache.usergrid.persistence.qakka.MetricsService;
+import org.apache.usergrid.persistence.qakka.QakkaFig;
+import org.apache.usergrid.persistence.qakka.core.CassandraClient;
+import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
+import org.apache.usergrid.persistence.qakka.core.QakkaUtils;
+import org.apache.usergrid.persistence.qakka.distributed.messages.QueueTimeoutRequest;
+import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
+import org.apache.usergrid.persistence.qakka.serialization.MultiShardMessageIterator;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+import java.util.UUID;
+
+
+public class QueueTimeouter extends UntypedActor {
+ private static final Logger logger = LoggerFactory.getLogger( QueueTimeouter.class );
+
+ private final String queueName;
+
+ private final QueueMessageSerialization messageSerialization;
+ private final MetricsService metricsService;
+ private final ActorSystemFig actorSystemFig;
+ private final QakkaFig qakkaFig;
+ private final CassandraClient cassandraClient;
+
+
+ public QueueTimeouter(String queueName ) {
+ this.queueName = queueName;
+
+ Injector injector = App.INJECTOR;
+
+ messageSerialization = injector.getInstance( QueueMessageSerialization.class );
+ actorSystemFig = injector.getInstance( ActorSystemFig.class );
+ qakkaFig = injector.getInstance( QakkaFig.class );
+ metricsService = injector.getInstance( MetricsService.class );
+ cassandraClient = injector.getInstance( CassandraClientImpl.class );
+ }
+
+
+ @Override
+ public void onReceive(Object message) {
+
+ if ( message instanceof QueueTimeoutRequest) {
+
+ Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.TIMEOUT_TIME).time();
+
+ try {
+
+ QueueTimeoutRequest request = (QueueTimeoutRequest) message;
+
+ if (!request.getQueueName().equals( queueName )) {
+ throw new QakkaRuntimeException(
+ "QueueTimeouter for " + queueName + ": Incorrect queueName " + request.getQueueName() );
+ }
+
+ //logger.debug("Processing timeouts for queue {} ", queueName );
+
+ int count = 0;
+ String region = actorSystemFig.getRegionLocal();
+
+ ShardIterator shardIterator = new ShardIterator(
+ cassandraClient, queueName, region, Shard.Type.INFLIGHT, Optional.empty());
+
+ MultiShardMessageIterator multiShardIteratorInflight = new MultiShardMessageIterator(
+ cassandraClient, queueName, region, DatabaseQueueMessage.Type.INFLIGHT, shardIterator, null);
+
+ while ( multiShardIteratorInflight.hasNext() ) {
+
+ DatabaseQueueMessage queueMessage = multiShardIteratorInflight.next();
+
+ long currentTime = System.currentTimeMillis();
+
+ if ((currentTime - queueMessage.getInflightAt()) > qakkaFig.getQueueTimeoutSeconds() * 1000) {
+
+ // put message back in messages_available table as new queue message with new UUID
+
+ UUID newQueueMessageId = QakkaUtils.getTimeUuid();
+
+ DatabaseQueueMessage newMessage = new DatabaseQueueMessage(
+ queueMessage.getMessageId(),
+ DatabaseQueueMessage.Type.DEFAULT,
+ queueMessage.getQueueName(),
+ queueMessage.getRegion(),
+ null,
+ queueMessage.getQueuedAt(),
+ queueMessage.getInflightAt(),
+ newQueueMessageId );
+
+ messageSerialization.writeMessage( newMessage );
+
+ // remove message from inflight table
+
+ messageSerialization.deleteMessage(
+ queueName,
+ actorSystemFig.getRegionLocal(),
+ null,
+ DatabaseQueueMessage.Type.INFLIGHT,
+ queueMessage.getQueueMessageId() );
+
+ count++;
+ }
+ }
+
+ if (count > 0) {
+ logger.debug( "Timed out {} messages for queue {}", count, queueName );
+ }
+
+ } finally {
+ timer.close();
+ }
+
+ } else {
+ unhandled( message );
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
new file mode 100644
index 0000000..6c91eb0
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.distributed.actors;
+
+import akka.actor.UntypedActor;
+import com.codahale.metrics.Timer;
+import com.google.inject.Injector;
+import org.apache.usergrid.persistence.qakka.App;
+import org.apache.usergrid.persistence.qakka.MetricsService;
+import org.apache.usergrid.persistence.qakka.core.QakkaUtils;
+import org.apache.usergrid.persistence.qakka.distributed.messages.QueueWriteRequest;
+import org.apache.usergrid.persistence.qakka.distributed.messages.QueueWriteResponse;
+import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLog;
+import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLogSerialization;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
+import org.apache.usergrid.persistence.qakka.serialization.transferlog.TransferLogSerialization;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.UUID;
+
+
+public class QueueWriter extends UntypedActor {
+ private static final Logger logger = LoggerFactory.getLogger( QueueWriter.class );
+
+ public enum WriteStatus { SUCCESS_XFERLOG_DELETED, SUCCESS_XFERLOG_NOTDELETED, ERROR };
+
+ private final QueueMessageSerialization messageSerialization;
+ private final TransferLogSerialization transferLogSerialization;
+ private final AuditLogSerialization auditLogSerialization;
+ private final MetricsService metricsService;
+
+
+ public QueueWriter() {
+
+ Injector injector = App.INJECTOR;
+
+ messageSerialization = injector.getInstance( QueueMessageSerialization.class );
+ transferLogSerialization = injector.getInstance( TransferLogSerialization.class );
+ auditLogSerialization = injector.getInstance( AuditLogSerialization.class );
+ metricsService = injector.getInstance( MetricsService.class );
+ }
+
+ @Override
+ public void onReceive(Object message) {
+
+ if (message instanceof QueueWriteRequest) {
+
+ Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.SEND_TIME_WRITE ).time();
+
+ try {
+ QueueWriteRequest qa = (QueueWriteRequest) message;
+
+ UUID queueMessageId = QakkaUtils.getTimeUuid();
+
+ // TODO: implement deliveryTime and expirationTime
+
+ DatabaseQueueMessage dbqm = null;
+ long currentTime = System.currentTimeMillis();
+
+ try {
+ dbqm = new DatabaseQueueMessage(
+ qa.getMessageId(),
+ DatabaseQueueMessage.Type.DEFAULT,
+ qa.getQueueName(),
+ qa.getDestRegion(),
+ null,
+ currentTime,
+ currentTime,
+ queueMessageId );
+
+ messageSerialization.writeMessage( dbqm );
+
+ //logger.debug("Wrote queue message id {} to queue name {}",
+ // dbqm.getQueueMessageId(), dbqm.getQueueName());
+
+ } catch (Throwable t) {
+ logger.debug("Error creating database queue message", t);
+
+ auditLogSerialization.recordAuditLog(
+ AuditLog.Action.SEND,
+ AuditLog.Status.ERROR,
+ qa.getQueueName(),
+ qa.getDestRegion(),
+ qa.getMessageId(),
+ dbqm.getMessageId() );
+
+ getSender().tell( new QueueWriteResponse(
+ QueueWriter.WriteStatus.ERROR ), getSender() );
+
+ return;
+ }
+
+ auditLogSerialization.recordAuditLog(
+ AuditLog.Action.SEND,
+ AuditLog.Status.SUCCESS,
+ qa.getQueueName(),
+ qa.getDestRegion(),
+ qa.getMessageId(),
+ dbqm.getQueueMessageId() );
+
+ try {
+ transferLogSerialization.removeTransferLog(
+ qa.getQueueName(),
+ qa.getSourceRegion(),
+ qa.getDestRegion(),
+ qa.getMessageId() );
+
+ getSender().tell( new QueueWriteResponse(
+ QueueWriter.WriteStatus.SUCCESS_XFERLOG_DELETED ), getSender() );
+
+ } catch (Throwable e) {
+ logger.error("Error deleting transferlog", e);
+ logger.debug( "Unable to delete transfer log for {} {} {} {}",
+ qa.getQueueName(),
+ qa.getSourceRegion(),
+ qa.getDestRegion(),
+ qa.getMessageId() );
+
+ getSender().tell( new QueueWriteResponse(
+ QueueWriter.WriteStatus.SUCCESS_XFERLOG_NOTDELETED ), getSender() );
+ }
+
+ } finally {
+ timer.close();
+ }
+
+ } else {
+ unhandled( message );
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java
new file mode 100644
index 0000000..9cf06d9
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.distributed.actors;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import akka.routing.FromConfig;
+import org.apache.usergrid.persistence.qakka.distributed.messages.QueueWriteRequest;
+
+
+/**
+ * Route messages to QueueWriters
+ */
+public class QueueWriterRouter extends UntypedActor {
+
+ private final ActorRef router;
+
+
+ public QueueWriterRouter() {
+
+ router = getContext().actorOf(
+ FromConfig.getInstance().props(Props.create(QueueWriter.class )), "router");
+ }
+
+ @Override
+ public void onReceive(Object message) {
+
+ if ( message instanceof QueueWriteRequest) {
+ router.tell( message, getSender() );
+
+ } else {
+ unhandled(message);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java
new file mode 100644
index 0000000..46e4906
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.distributed.actors;
+
+
+import akka.actor.UntypedActor;
+import com.codahale.metrics.Timer;
+import com.datastax.driver.core.utils.UUIDs;
+import com.google.inject.Injector;
+import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
+import org.apache.usergrid.persistence.qakka.App;
+import org.apache.usergrid.persistence.qakka.MetricsService;
+import org.apache.usergrid.persistence.qakka.QakkaFig;
+import org.apache.usergrid.persistence.qakka.core.CassandraClient;
+import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
+import org.apache.usergrid.persistence.qakka.distributed.messages.ShardCheckRequest;
+import org.apache.usergrid.persistence.qakka.exceptions.NotFoundException;
+import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardCounterSerialization;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardIterator;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardSerialization;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.impl.ShardCounterSerializationImpl;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.impl.ShardSerializationImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+import java.util.UUID;
+
+
+public class ShardAllocator extends UntypedActor {
+ private static final Logger logger = LoggerFactory.getLogger( ShardAllocator.class );
+
+ private final String queueName;
+
+ private final QakkaFig qakkaFig;
+ private final ActorSystemFig actorSystemFig;
+ private final ShardSerialization shardSerialization;
+ private final ShardCounterSerialization shardCounterSerialization;
+ private final MetricsService metricsService;
+ private final CassandraClient cassandraClient;
+
+
+ public ShardAllocator( String queueName ) {
+ this.queueName = queueName;
+
+ Injector injector = App.INJECTOR;
+
+ this.qakkaFig = injector.getInstance( QakkaFig.class );
+ this.shardCounterSerialization = injector.getInstance( ShardCounterSerializationImpl.class );
+ this.shardSerialization = injector.getInstance( ShardSerializationImpl.class );
+ this.actorSystemFig = injector.getInstance( ActorSystemFig.class );
+ this.metricsService = injector.getInstance( MetricsService.class );
+ this.cassandraClient = injector.getInstance( CassandraClientImpl.class );
+
+ logger.debug( "Created shard allocator for queue {}", queueName );
+ }
+
+
+ @Override
+ public void onReceive( Object message ) throws Exception {
+
+ if ( message instanceof ShardCheckRequest) {
+
+ ShardCheckRequest request = (ShardCheckRequest) message;
+
+ if (!request.getQueueName().equals( queueName )) {
+ throw new QakkaRuntimeException(
+ "ShardAllocator for " + queueName + ": Incorrect queueName " + request.getQueueName() );
+ }
+
+ // check both types of shard
+ checkLatestShard( Shard.Type.DEFAULT );
+ checkLatestShard( Shard.Type.INFLIGHT );
+
+ } else {
+ unhandled( message );
+ }
+
+ }
+
+ private void checkLatestShard( Shard.Type type ) {
+
+ Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.ALLOCATE_TIME).time();
+
+ try {
+
+ String region = actorSystemFig.getRegionLocal();
+
+ // find newest shard
+
+ ShardIterator shardIterator = new ShardIterator(
+ cassandraClient, queueName, region, type, Optional.empty() );
+
+ Shard shard = null;
+ while (shardIterator.hasNext()) {
+ shard = shardIterator.next();
+ }
+
+ if (shard == null) {
+ logger.warn( "No shard found for {}, {}, {}", queueName, region, type );
+ return;
+ }
+
+ // if its count is greater than 90% of max shard size, then allocate a new shard
+
+ long counterValue = 0;
+ try {
+ counterValue = shardCounterSerialization.getCounterValue( queueName, type, shard.getShardId() );
+ } catch ( NotFoundException ignored ) {}
+
+ if (counterValue > (0.9 * qakkaFig.getMaxShardSize())) {
+
+ // Create UUID from a UNIX timestamp via DataStax utility
+ // https://docs.datastax.com/en/drivers/java/2.0/com/datastax/driver/core/utils/UUIDs.html
+ UUID futureUUID = UUIDs.startOf(
+ System.currentTimeMillis() + qakkaFig.getShardAllocationAdvanceTimeMillis());
+
+ Shard newShard = new Shard( queueName, region, type, shard.getShardId() + 1, futureUUID );
+ shardSerialization.createShard( newShard );
+ shardCounterSerialization.incrementCounter( queueName, type, newShard.getShardId(), 0 );
+
+ logger.info("{} Created new shard for queue {} shardId {} timestamp {} counterValue {}",
+ this.hashCode(), queueName, shard.getShardId(), futureUUID.timestamp(), counterValue );
+ }
+
+ } catch ( Throwable t ) {
+ logger.error("Error while checking shard allocations", t);
+
+ } finally {
+ timer.close();
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
new file mode 100644
index 0000000..9551c61
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.distributed.impl;
+
+import akka.actor.ActorRef;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.usergrid.persistence.actorsystem.ActorSystemManager;
+import org.apache.usergrid.persistence.qakka.QakkaFig;
+import org.apache.usergrid.persistence.qakka.core.QueueManager;
+import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
+import org.apache.usergrid.persistence.qakka.distributed.messages.*;
+import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+
+@Singleton
+public class DistributedQueueServiceImpl implements DistributedQueueService {
+
+ private static final Logger logger = LoggerFactory.getLogger( DistributedQueueServiceImpl.class );
+
+ private final ActorSystemManager actorSystemManager;
+ private final QueueManager queueManager;
+ private final QakkaFig qakkaFig;
+
+
+ @Inject
+ public DistributedQueueServiceImpl(
+ ActorSystemManager actorSystemManager,
+ QueueManager queueManager,
+ QakkaFig qakkaFig,
+ QueueActorRouterProducer queueActorRouterProducer,
+ QueueWriterRouterProducer queueWriterRouterProducer,
+ QueueSenderRouterProducer queueSenderRouterProducer ) {
+
+ this.actorSystemManager = actorSystemManager;
+ this.queueManager = queueManager;
+ this.qakkaFig = qakkaFig;
+
+ actorSystemManager.registerRouterProducer( queueActorRouterProducer );
+ actorSystemManager.registerRouterProducer( queueWriterRouterProducer );
+ actorSystemManager.registerRouterProducer( queueSenderRouterProducer );
+ }
+
+
+ @Override
+ public void init() {
+ for ( String queueName : queueManager.getListOfQueues() ) {
+ initQueue( queueName );
+ }
+ }
+
+
+ @Override
+ public void initQueue(String queueName) {
+ QueueInitRequest request = new QueueInitRequest( queueName );
+ ActorRef clientActor = actorSystemManager.getClientActor();
+ clientActor.tell( request, null );
+ }
+
+
+ @Override
+ public void refresh() {
+ for ( String queueName : queueManager.getListOfQueues() ) {
+ refreshQueue( queueName );
+ }
+ }
+
+
+ @Override
+ public void refreshQueue(String queueName) {
+ QueueRefreshRequest request = new QueueRefreshRequest( queueName );
+ ActorRef clientActor = actorSystemManager.getClientActor();
+ clientActor.tell( request, null );
+ }
+
+
+ @Override
+ public void processTimeouts() {
+
+ for ( String queueName : queueManager.getListOfQueues() ) {
+
+ QueueTimeoutRequest request = new QueueTimeoutRequest( queueName );
+
+ ActorRef clientActor = actorSystemManager.getClientActor();
+ clientActor.tell( request, null );
+ }
+ }
+
+
+ @Override
+ public DistributedQueueService.Status sendMessageToRegion(
+ String queueName, String sourceRegion, String destRegion, UUID messageId,
+ Long deliveryTime, Long expirationTime ) {
+
+ List<String> queueNames = queueManager.getListOfQueues();
+ if ( !queueNames.contains( queueName ) ) {
+ throw new QakkaRuntimeException( "Queue name: " + queueName + " does not exist");
+ }
+
+ int maxRetries = qakkaFig.getMaxSendRetries();
+ int retries = 0;
+
+ QueueSendRequest request = new QueueSendRequest(
+ queueName, sourceRegion, destRegion, messageId, deliveryTime, expirationTime );
+
+ while ( retries++ < maxRetries ) {
+ try {
+ Timeout t = new Timeout( qakkaFig.getSendTimeoutSeconds(), TimeUnit.SECONDS );
+
+ // send to current region via local clientActor
+ ActorRef clientActor = actorSystemManager.getClientActor();
+ Future<Object> fut = Patterns.ask( clientActor, request, t );
+
+ // wait for response...
+ final Object response = Await.result( fut, t.duration() );
+
+ if ( response != null && response instanceof QueueSendResponse) {
+ QueueSendResponse qarm = (QueueSendResponse)response;
+
+ if ( !DistributedQueueService.Status.ERROR.equals( qarm.getSendStatus() )) {
+
+ if ( retries > 1 ) {
+ logger.debug("SUCCESS after {} retries", retries );
+ }
+ return qarm.getSendStatus();
+
+ } else {
+ logger.debug("ERROR STATUS sending to queue, retrying {}", retries );
+ }
+
+ } else if ( response != null ) {
+ logger.debug("NULL RESPONSE sending to queue, retrying {}", retries );
+
+ } else {
+ logger.debug("TIMEOUT sending to queue, retrying {}", retries );
+ }
+
+ } catch ( Exception e ) {
+ logger.debug("ERROR sending to queue, retrying " + retries, e );
+ }
+ }
+
+ throw new QakkaRuntimeException( "Error sending to queue after " + retries );
+ }
+
+
+ @Override
+ public Collection<DatabaseQueueMessage> getNextMessages( String queueName, int count ) {
+
+ List<String> queueNames = queueManager.getListOfQueues();
+ if ( !queueNames.contains( queueName ) ) {
+ throw new QakkaRuntimeException( "Queue name: " + queueName + " does not exist");
+ }
+
+ int maxRetries = qakkaFig.getMaxGetRetries();
+ int retries = 0;
+
+ QueueGetRequest request = new QueueGetRequest( queueName, count );
+ while ( retries++ < maxRetries ) {
+ try {
+ Timeout t = new Timeout( qakkaFig.getGetTimeoutSeconds(), TimeUnit.SECONDS );
+
+ // ask ClientActor and wait (up to timeout) for response
+
+ Future<Object> fut = Patterns.ask( actorSystemManager.getClientActor(), request, t );
+ final QakkaMessage response = (QakkaMessage)Await.result( fut, t.duration() );
+
+ if ( response != null && response instanceof QueueGetResponse) {
+ QueueGetResponse qprm = (QueueGetResponse)response;
+ if ( qprm.isSuccess() ) {
+ if (retries > 1) {
+ logger.debug( "getNextMessage SUCCESS after {} retries", retries );
+ }
+ }
+ return qprm.getQueueMessages();
+
+
+ } else if ( response != null ) {
+ logger.debug("ERROR RESPONSE popping queue, retrying {}", retries );
+
+ } else {
+ logger.debug("TIMEOUT popping to queue, retrying {}", retries );
+ }
+
+ } catch ( Exception e ) {
+ logger.debug("ERROR popping to queue, retrying " + retries, e );
+ }
+ }
+
+ throw new QakkaRuntimeException(
+ "Error getting from queue " + queueName + " after " + retries );
+ }
+
+
+ @Override
+ public Status ackMessage(String queueName, UUID queueMessageId ) {
+
+ List<String> queueNames = queueManager.getListOfQueues();
+ if ( !queueNames.contains( queueName ) ) {
+ return Status.BAD_REQUEST;
+ }
+
+ QueueAckRequest message = new QueueAckRequest( queueName, queueMessageId );
+ return sendMessageToLocalQueueActors( message );
+ }
+
+
+ @Override
+ public Status requeueMessage(String queueName, UUID messageId) {
+
+ List<String> queueNames = queueManager.getListOfQueues();
+ if ( !queueNames.contains( queueName ) ) {
+ throw new QakkaRuntimeException( "Queue name: " + queueName + " does not exist");
+ }
+
+ QueueAckRequest message = new QueueAckRequest( queueName, messageId );
+ return sendMessageToLocalQueueActors( message );
+ }
+
+
+ @Override
+ public Status clearMessages(String queueName) {
+
+ List<String> queueNames = queueManager.getListOfQueues();
+ if ( !queueNames.contains( queueName ) ) {
+ throw new QakkaRuntimeException( "Queue name: " + queueName + " does not exist");
+ }
+
+ // TODO: implement clear queue
+ throw new UnsupportedOperationException();
+ }
+
+
+ private Status sendMessageToLocalQueueActors( QakkaMessage message ) {
+
+ int maxRetries = 5;
+ int retries = 0;
+
+ while ( retries++ < maxRetries ) {
+ try {
+ Timeout t = new Timeout( 1, TimeUnit.SECONDS );
+
+ // ask ClientActor and wait (up to timeout) for response
+
+ Future<Object> fut = Patterns.ask( actorSystemManager.getClientActor(), message, t );
+ final QakkaMessage response = (QakkaMessage)Await.result( fut, t.duration() );
+
+ if ( response != null && response instanceof QueueAckResponse) {
+ QueueAckResponse qprm = (QueueAckResponse)response;
+ return qprm.getStatus();
+
+ } else if ( response != null ) {
+ logger.debug("ERROR RESPONSE sending message, retrying {}", retries );
+
+ } else {
+ logger.debug("TIMEOUT sending message, retrying {}", retries );
+ }
+
+ } catch ( Exception e ) {
+ logger.debug("ERROR sending message, retrying " + retries, e );
+ }
+ }
+
+ throw new QakkaRuntimeException(
+ "Error sending message " + message + "after " + retries );
+ }
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueActorRouterProducer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueActorRouterProducer.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueActorRouterProducer.java
new file mode 100644
index 0000000..8c6adda
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueActorRouterProducer.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.distributed.impl;
+
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.cluster.singleton.ClusterSingletonManager;
+import akka.cluster.singleton.ClusterSingletonManagerSettings;
+import akka.cluster.singleton.ClusterSingletonProxy;
+import akka.cluster.singleton.ClusterSingletonProxySettings;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import org.apache.usergrid.persistence.actorsystem.ActorSystemManager;
+import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer;
+import org.apache.usergrid.persistence.actorsystem.RouterProducer;
+import org.apache.usergrid.persistence.qakka.QakkaFig;
+import org.apache.usergrid.persistence.qakka.distributed.actors.QueueActorRouter;
+import org.apache.usergrid.persistence.qakka.distributed.messages.*;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+
+public class QueueActorRouterProducer implements RouterProducer {
+
+ static Injector injector;
+ ActorSystemManager actorSystemManager;
+ QakkaFig qakkaFig;
+
+
+ @Inject
+ public QueueActorRouterProducer(
+ Injector injector,
+ ActorSystemManager actorSystemManager,
+ QakkaFig qakkaFig) {
+
+ this.injector = injector;
+ this.actorSystemManager = actorSystemManager;
+ this.qakkaFig = qakkaFig;
+ }
+
+
+ @Override
+ public String getRouterPath() {
+ return "/user/queueActorRouterProxy";
+ }
+
+
+ @Override
+ public void produceRouter(ActorSystem system, String role) {
+
+ ClusterSingletonManagerSettings settings =
+ ClusterSingletonManagerSettings.create( system ).withRole( "io" );
+
+ system.actorOf( ClusterSingletonManager.props(
+ Props.create( GuiceActorProducer.class, injector, QueueActorRouter.class ),
+ PoisonPill.getInstance(), settings ), "queueActorRouter" );
+
+ ClusterSingletonProxySettings proxySettings =
+ ClusterSingletonProxySettings.create( system ).withRole( role );
+
+ system.actorOf(
+ ClusterSingletonProxy.props( "/user/queueActorRouter", proxySettings ), "queueActorRouterProxy" );
+ }
+
+
+ @Override
+ public void addConfiguration(Map<String, Object> configMap) {
+
+ int numInstancesPerNode = qakkaFig.getNumQueueActors();
+
+ Map<String, Object> akka = (Map<String, Object>) configMap.get( "akka" );
+ final Map<String, Object> deploymentMap;
+
+ if ( akka.get( "actor" ) == null ) {
+ deploymentMap = new HashMap<>();
+ akka.put( "actor", new HashMap<String, Object>() {{
+ put( "deployment", deploymentMap );
+ }} );
+
+ } else if (((Map) akka.get( "actor" )).get( "deployment" ) == null) {
+ deploymentMap = new HashMap<>();
+ ((Map) akka.get( "actor" )).put( "deployment", deploymentMap );
+
+ } else {
+ deploymentMap = (Map<String, Object>) ((Map) akka.get( "actor" )).get( "deployment" );
+ }
+
+ deploymentMap.put( "/queueActorRouter/singleton/router", new HashMap<String, Object>() {{
+ put( "router", "consistent-hashing-pool" );
+ put( "cluster", new HashMap<String, Object>() {{
+ put( "enabled", "on" );
+ put( "allow-local-routees", "on" );
+ put( "use-role", "io" );
+ put( "max-nr-of-instances-per-node", numInstancesPerNode );
+ put( "failure-detector", new HashMap<String, Object>() {{
+ put( "threshold", "10" );
+ put( "acceptable-heartbeat-pause", "3 s" );
+ put( "heartbeat-interval", "1 s" );
+ put( "heartbeat-request", new HashMap<String, Object>() {{
+ put( "expected-response-after", "3 s" );
+ }} );
+ }} );
+ }} );
+ }} );
+
+ }
+
+
+ @Override
+ public Collection<Class> getMessageTypes() {
+ return new ArrayList() {{
+ add( QueueGetRequest.class );
+ add( QueueAckRequest.class );
+ add( QueueInitRequest.class );
+ add( QueueRefreshRequest.class );
+ add( QueueTimeoutRequest.class );
+ add( ShardCheckRequest.class );
+ }};
+ }
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueSenderRouterProducer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueSenderRouterProducer.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueSenderRouterProducer.java
new file mode 100644
index 0000000..885a559
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueSenderRouterProducer.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.distributed.impl;
+
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.cluster.singleton.ClusterSingletonManager;
+import akka.cluster.singleton.ClusterSingletonManagerSettings;
+import akka.cluster.singleton.ClusterSingletonProxy;
+import akka.cluster.singleton.ClusterSingletonProxySettings;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import org.apache.usergrid.persistence.actorsystem.ActorSystemManager;
+import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer;
+import org.apache.usergrid.persistence.actorsystem.RouterProducer;
+import org.apache.usergrid.persistence.qakka.QakkaFig;
+import org.apache.usergrid.persistence.qakka.distributed.actors.QueueSenderRouter;
+import org.apache.usergrid.persistence.qakka.distributed.messages.QueueSendRequest;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+
+public class QueueSenderRouterProducer implements RouterProducer {
+
+ static Injector injector;
+ ActorSystemManager actorSystemManager;
+ QakkaFig qakkaFig;
+
+
+ @Inject
+ public QueueSenderRouterProducer(
+ Injector injector,
+ ActorSystemManager actorSystemManager,
+ QakkaFig qakkaFig) {
+
+ this.injector = injector;
+ this.actorSystemManager = actorSystemManager;
+ this.qakkaFig = qakkaFig;
+ }
+
+
+ @Override
+ public String getRouterPath() {
+ return "/user/queueSenderRouterProxy";
+ }
+
+
+ @Override
+ public void produceRouter(ActorSystem system, String role) {
+
+ ClusterSingletonManagerSettings settings =
+ ClusterSingletonManagerSettings.create( system ).withRole( "io" );
+
+ system.actorOf( ClusterSingletonManager.props(
+ Props.create( GuiceActorProducer.class, injector, QueueSenderRouter.class ),
+ PoisonPill.getInstance(), settings ), "queueSenderRouter" );
+
+ ClusterSingletonProxySettings proxySettings =
+ ClusterSingletonProxySettings.create( system ).withRole( role );
+
+ system.actorOf(
+ ClusterSingletonProxy.props( "/user/queueSenderRouter", proxySettings ), "queueSenderRouterProxy" );
+ }
+
+
+ @Override
+ public void addConfiguration(Map<String, Object> configMap) {
+
+ int numInstancesPerNode = qakkaFig.getNumQueueSenderActors();
+
+ Map<String, Object> akka = (Map<String, Object>) configMap.get( "akka" );
+ final Map<String, Object> deploymentMap;
+
+ if ( akka.get( "actor" ) == null ) {
+ deploymentMap = new HashMap<>();
+ akka.put( "actor", new HashMap<String, Object>() {{
+ put( "deployment", deploymentMap );
+ }} );
+
+ } else if (((Map) akka.get( "actor" )).get( "deployment" ) == null) {
+ deploymentMap = new HashMap<>();
+ ((Map) akka.get( "actor" )).put( "deployment", deploymentMap );
+
+ } else {
+ deploymentMap = (Map<String, Object>) ((Map) akka.get( "actor" )).get( "deployment" );
+ }
+
+ deploymentMap.put( "/queueSenderRouter/singleton/router", new HashMap<String, Object>() {{
+ put( "router", "round-robin-pool" );
+ put( "cluster", new HashMap<String, Object>() {{
+ put( "enabled", "on" );
+ put( "allow-local-routees", "on" );
+ put( "use-role", "io" );
+ put( "max-nr-of-instances-per-node", numInstancesPerNode );
+ put( "failure-detector", new HashMap<String, Object>() {{
+ put( "threshold", "10" );
+ put( "acceptable-heartbeat-pause", "3 s" );
+ put( "heartbeat-interval", "1 s" );
+ put( "heartbeat-request", new HashMap<String, Object>() {{
+ put( "expected-response-after", "3 s" );
+ }} );
+ }} );
+ }} );
+ }} );
+
+ }
+
+ @Override
+ public Collection<Class> getMessageTypes() {
+ return Collections.singletonList( QueueSendRequest.class );
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueWriterRouterProducer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueWriterRouterProducer.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueWriterRouterProducer.java
new file mode 100644
index 0000000..c8b5567
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueWriterRouterProducer.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.distributed.impl;
+
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.cluster.singleton.ClusterSingletonManager;
+import akka.cluster.singleton.ClusterSingletonManagerSettings;
+import akka.cluster.singleton.ClusterSingletonProxy;
+import akka.cluster.singleton.ClusterSingletonProxySettings;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import org.apache.usergrid.persistence.actorsystem.ActorSystemManager;
+import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer;
+import org.apache.usergrid.persistence.actorsystem.RouterProducer;
+import org.apache.usergrid.persistence.qakka.QakkaFig;
+import org.apache.usergrid.persistence.qakka.distributed.actors.QueueWriterRouter;
+import org.apache.usergrid.persistence.qakka.distributed.messages.QueueWriteRequest;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+
+public class QueueWriterRouterProducer implements RouterProducer {
+
+ static Injector injector;
+ ActorSystemManager actorSystemManager;
+ QakkaFig qakkaFig;
+
+
+ @Inject
+ public QueueWriterRouterProducer(
+ Injector injector,
+ ActorSystemManager actorSystemManager,
+ QakkaFig qakkaFig) {
+
+ this.injector = injector;
+ this.actorSystemManager = actorSystemManager;
+ this.qakkaFig = qakkaFig;
+ }
+
+
+ @Override
+ public String getRouterPath() {
+ return "/user/queueWriterRouterProxy";
+ }
+
+
+ @Override
+ public void produceRouter(ActorSystem system, String role) {
+
+ ClusterSingletonManagerSettings settings =
+ ClusterSingletonManagerSettings.create( system ).withRole( "io" );
+
+ system.actorOf( ClusterSingletonManager.props(
+ Props.create( GuiceActorProducer.class, injector, QueueWriterRouter.class ),
+ PoisonPill.getInstance(), settings ), "queueWriterRouter" );
+
+ ClusterSingletonProxySettings proxySettings =
+ ClusterSingletonProxySettings.create( system ).withRole( role );
+
+ system.actorOf(
+ ClusterSingletonProxy.props( "/user/queueWriterRouter", proxySettings ), "queueWriterRouterProxy" );
+ }
+
+
+ @Override
+ public void addConfiguration(Map<String, Object> configMap) {
+
+ int numInstancesPerNode = qakkaFig.getNumQueueWriterActors();
+
+ Map<String, Object> akka = (Map<String, Object>) configMap.get( "akka" );
+ final Map<String, Object> deploymentMap;
+
+ if ( akka.get( "actor" ) == null ) {
+ deploymentMap = new HashMap<>();
+ akka.put( "actor", new HashMap<String, Object>() {{
+ put( "deployment", deploymentMap );
+ }} );
+
+ } else if (((Map) akka.get( "actor" )).get( "deployment" ) == null) {
+ deploymentMap = new HashMap<>();
+ ((Map) akka.get( "actor" )).put( "deployment", deploymentMap );
+
+ } else {
+ deploymentMap = (Map<String, Object>) ((Map) akka.get( "actor" )).get( "deployment" );
+ }
+
+ deploymentMap.put( "/queueWriterRouter/singleton/router", new HashMap<String, Object>() {{
+ put( "router", "round-robin-pool" );
+ put( "cluster", new HashMap<String, Object>() {{
+ put( "enabled", "on" );
+ put( "allow-local-routees", "on" );
+ put( "use-role", "io" );
+ put( "max-nr-of-instances-per-node", numInstancesPerNode );
+ put( "failure-detector", new HashMap<String, Object>() {{
+ put( "threshold", "10" );
+ put( "acceptable-heartbeat-pause", "3 s" );
+ put( "heartbeat-interval", "1 s" );
+ put( "heartbeat-request", new HashMap<String, Object>() {{
+ put( "expected-response-after", "3 s" );
+ }} );
+ }} );
+ }} );
+ }} );
+
+ }
+
+ @Override
+ public Collection<Class> getMessageTypes() {
+ return Collections.singletonList( QueueWriteRequest.class );
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QakkaMessage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QakkaMessage.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QakkaMessage.java
new file mode 100644
index 0000000..a1bbf14
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QakkaMessage.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.distributed.messages;
+
+import java.io.Serializable;
+
+/**
+ * Marker interface
+ */
+public interface QakkaMessage extends Serializable {
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueAckRequest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueAckRequest.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueAckRequest.java
new file mode 100644
index 0000000..4beb46b
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueAckRequest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.distributed.messages;
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
+
+import java.util.UUID;
+
+
+public class QueueAckRequest implements QakkaMessage {
+ private final String queueName;
+ private final UUID queueMessageId;
+
+
+ public QueueAckRequest( String queueName, UUID queueMessageId ) {
+ this.queueName = queueName;
+ this.queueMessageId = queueMessageId;
+ }
+
+ public String getQueueName() {
+ return queueName;
+ }
+
+ public UUID getQueueMessageId() {
+ return queueMessageId;
+ }
+
+ public String toString() {
+ return new ToStringBuilder( this )
+ .append( "queueName", queueName )
+ .append( "queueMessageId", queueMessageId )
+ .toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueAckResponse.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueAckResponse.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueAckResponse.java
new file mode 100644
index 0000000..68e6213
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueAckResponse.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.distributed.messages;
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
+
+import java.util.UUID;
+
+
+public class QueueAckResponse implements QakkaMessage {
+ private final String queueName;
+ private final UUID messageId;
+ private final DistributedQueueService.Status status;
+
+ public QueueAckResponse( String queueName, UUID messageId, DistributedQueueService.Status status ) {
+ this.queueName = queueName;
+ this.messageId = messageId;
+ this.status = status;
+ }
+
+ public String getQueueName() {
+ return queueName;
+ }
+
+ public UUID getMessageId() {
+ return messageId;
+ }
+
+ public DistributedQueueService.Status getStatus() {
+ return status;
+ }
+
+ public String toString() {
+ return new ToStringBuilder( this )
+ .append( "queueName", queueName )
+ .append( "messageId", messageId )
+ .append( "status", status )
+ .toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueGetRequest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueGetRequest.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueGetRequest.java
new file mode 100644
index 0000000..c23dcf6
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueGetRequest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.distributed.messages;
+
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
+
+public class QueueGetRequest implements QakkaMessage {
+ private final String queueName;
+ private final int numRequested;
+
+ public QueueGetRequest(String queueName, int numRequested ) {
+ this.queueName = queueName;
+ this.numRequested = numRequested;
+ }
+
+ public String getQueueName() {
+ return queueName;
+ }
+
+ public int getNumRequested() {
+ return numRequested;
+ }
+
+ public String toString() {
+ return new ToStringBuilder( this )
+ .append( "queueName", queueName )
+ .append( "numRequested", numRequested )
+ .toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueGetResponse.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueGetResponse.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueGetResponse.java
new file mode 100644
index 0000000..c8004fb
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueGetResponse.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.distributed.messages;
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+
+import java.util.Collection;
+import java.util.Collections;
+
+
+public class QueueGetResponse implements QakkaMessage {
+ private final Collection<DatabaseQueueMessage> queueMessages;
+ private final DistributedQueueService.Status status;
+
+
+ public QueueGetResponse(DistributedQueueService.Status status ) {
+ this.status = status;
+ this.queueMessages = Collections.emptyList();
+ }
+
+ public QueueGetResponse(DistributedQueueService.Status status, Collection<DatabaseQueueMessage> queueMessages) {
+ this.status = status;
+ this.queueMessages = queueMessages;
+ }
+
+ public DistributedQueueService.Status getStatus() {
+ return status;
+ }
+
+ public boolean isSuccess() {
+ return status.equals( DistributedQueueService.Status.SUCCESS);
+ }
+
+ public Collection<DatabaseQueueMessage> getQueueMessages() {
+ return queueMessages;
+ }
+
+ public String toString() {
+ return new ToStringBuilder( this )
+ .append( "messageCount", queueMessages.size() )
+ .append( "status", status )
+ .toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueInitRequest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueInitRequest.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueInitRequest.java
new file mode 100644
index 0000000..10180cd
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueInitRequest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.distributed.messages;
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
+
+
+public class QueueInitRequest implements QakkaMessage {
+ private final String queueName;
+
+
+ public QueueInitRequest(String queueName ) {
+ this.queueName = queueName;
+ }
+
+ public String getQueueName() {
+ return queueName;
+ }
+
+ public String toString() {
+ return new ToStringBuilder( this )
+ .append( "queueName", queueName )
+ .toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueRefreshRequest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueRefreshRequest.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueRefreshRequest.java
new file mode 100644
index 0000000..a81a6fd
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueRefreshRequest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.distributed.messages;
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
+
+
+public class QueueRefreshRequest implements QakkaMessage {
+ private final String queueName;
+
+
+ public QueueRefreshRequest(String queueName ) {
+ this.queueName = queueName;
+ }
+
+ public String getQueueName() {
+ return queueName;
+ }
+
+ public String toString() {
+ return new ToStringBuilder( this )
+ .append( "queueName", queueName )
+ .toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueSendRequest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueSendRequest.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueSendRequest.java
new file mode 100644
index 0000000..8a655f4
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueSendRequest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.distributed.messages;
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
+
+import java.util.UUID;
+
+
+public class QueueSendRequest implements QakkaMessage {
+
+ private final String queueName;
+ private final String sourceRegion;
+ private final String destRegion;
+ private final UUID messageId;
+ private Long deliveryTime;
+ private Long expirationTime;
+
+
+ public QueueSendRequest(
+ String queueName, String sourceRegion, String destRegion, UUID messageId,
+ Long deliveryTime, Long expirationTime) {
+
+ this.queueName = queueName;
+ this.sourceRegion = sourceRegion;
+ this.destRegion = destRegion;
+ this.messageId = messageId;
+ this.deliveryTime = deliveryTime;
+ this.expirationTime = expirationTime;
+ }
+
+ public String getQueueName() {
+ return queueName;
+ }
+
+ public String getSourceRegion() {
+ return sourceRegion;
+ }
+
+ public String getDestRegion() {
+ return destRegion;
+ }
+
+ public UUID getMessageId() {
+ return messageId;
+ }
+
+ public Long getExpirationTime() {
+ return expirationTime;
+ }
+
+ public Long getDeliveryTime() {
+ return deliveryTime;
+ }
+
+ public String toString() {
+ return new ToStringBuilder( this )
+ .append( "queueName", queueName )
+ .append( "sourceRegion", sourceRegion )
+ .append( "destRegion", destRegion )
+ .append( "messageId", messageId )
+ .append( "expirationTime", expirationTime )
+ .append( "deliveryTime", deliveryTime )
+ .toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueSendResponse.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueSendResponse.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueSendResponse.java
new file mode 100644
index 0000000..0c295a0
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueSendResponse.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.distributed.messages;
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
+
+
+public class QueueSendResponse implements QakkaMessage {
+ private final DistributedQueueService.Status status;
+
+ public QueueSendResponse(DistributedQueueService.Status status) {
+ this.status = status;
+ }
+
+ public DistributedQueueService.Status getSendStatus() {
+ return status;
+ }
+
+ public String toString() {
+ return new ToStringBuilder( this )
+ .append( "status", status )
+ .toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueTimeoutRequest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueTimeoutRequest.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueTimeoutRequest.java
new file mode 100644
index 0000000..5358459
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueTimeoutRequest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.distributed.messages;
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
+
+
+public class QueueTimeoutRequest implements QakkaMessage {
+ private final String queueName;
+
+
+ public QueueTimeoutRequest(String queueName ) {
+ this.queueName = queueName;
+ }
+
+ public String getQueueName() {
+ return queueName;
+ }
+
+ public String toString() {
+ return new ToStringBuilder( this )
+ .append( "queueName", queueName )
+ .toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueWriteRequest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueWriteRequest.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueWriteRequest.java
new file mode 100644
index 0000000..c7411de
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueWriteRequest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.distributed.messages;
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
+
+import java.util.UUID;
+
+
+public class QueueWriteRequest implements QakkaMessage {
+
+ private final String queueName;
+ private final String sourceRegion;
+ private final String destRegion;
+ private final UUID messageId;
+ private Long deliveryTime;
+ private Long expirationTime;
+
+
+ public QueueWriteRequest(
+ String queueName, String sourceRegion, String destRegion, UUID messageId,
+ Long deliveryTime, Long expirationTime) {
+
+ this.queueName = queueName;
+ this.sourceRegion = sourceRegion;
+ this.destRegion = destRegion;
+ this.messageId = messageId;
+ this.deliveryTime = deliveryTime;
+ this.expirationTime = expirationTime;
+ }
+
+ public String getQueueName() {
+ return queueName;
+ }
+
+ public String getSourceRegion() {
+ return sourceRegion;
+ }
+
+ public String getDestRegion() {
+ return destRegion;
+ }
+
+ public UUID getMessageId() {
+ return messageId;
+ }
+
+ public Long getExpirationTime() {
+ return expirationTime;
+ }
+
+ public Long getDeliveryTime() {
+ return deliveryTime;
+ }
+
+ public String toString() {
+ return new ToStringBuilder( this )
+ .append( "queueName", queueName )
+ .append( "sourceRegion", sourceRegion )
+ .append( "destRegion", destRegion )
+ .append( "messageId", messageId )
+ .append( "expirationTime", expirationTime )
+ .append( "deliveryTime", deliveryTime )
+ .toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueWriteResponse.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueWriteResponse.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueWriteResponse.java
new file mode 100644
index 0000000..1eb513c
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueWriteResponse.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.distributed.messages;
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.usergrid.persistence.qakka.distributed.actors.QueueWriter;
+
+
+public class QueueWriteResponse implements QakkaMessage {
+ private final QueueWriter.WriteStatus status;
+
+ public QueueWriteResponse(QueueWriter.WriteStatus status) {
+ this.status = status;
+ }
+
+ public QueueWriter.WriteStatus getSendStatus() {
+ return status;
+ }
+
+ public String toString() {
+ return new ToStringBuilder( this )
+ .append( "status", status )
+ .toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/ShardCheckRequest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/ShardCheckRequest.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/ShardCheckRequest.java
new file mode 100644
index 0000000..78dbe13
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/ShardCheckRequest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.distributed.messages;
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
+
+
+public class ShardCheckRequest implements QakkaMessage {
+ private final String queueName;
+
+
+ public ShardCheckRequest(String queueName ) {
+ this.queueName = queueName;
+ }
+
+ public String getQueueName() {
+ return queueName;
+ }
+
+ public String toString() {
+ return new ToStringBuilder( this )
+ .append( "queueName", queueName )
+ .toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/exceptions/BadRequestException.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/exceptions/BadRequestException.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/exceptions/BadRequestException.java
new file mode 100644
index 0000000..500fb6a
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/exceptions/BadRequestException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.exceptions;
+
+
+public class BadRequestException extends QakkaRuntimeException {
+
+ public BadRequestException(String message) {
+ super( message );
+ }
+
+ public BadRequestException(String message, Throwable cause) {
+ super( message, cause );
+ }
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/exceptions/NotFoundException.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/exceptions/NotFoundException.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/exceptions/NotFoundException.java
new file mode 100644
index 0000000..5f76163
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/exceptions/NotFoundException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.exceptions;
+
+
+public class NotFoundException extends QakkaRuntimeException {
+
+ public NotFoundException(String message) {
+ super( message );
+ }
+
+ public NotFoundException(String message, Throwable cause) {
+ super( message, cause );
+ }
+}