You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2016/09/16 22:34:40 UTC

[11/25] usergrid git commit: Initial integration of Qakka into Usergrid Queue module, and implementation of Qakka-based LegacyQueueManager implementation.

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
new file mode 100644
index 0000000..fcd2161
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.distributed.actors;
+
+import akka.actor.UntypedActor;
+import com.codahale.metrics.Timer;
+import com.google.inject.Injector;
+import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
+import org.apache.usergrid.persistence.qakka.App;
+import org.apache.usergrid.persistence.qakka.MetricsService;
+import org.apache.usergrid.persistence.qakka.QakkaFig;
+import org.apache.usergrid.persistence.qakka.core.CassandraClient;
+import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
+import org.apache.usergrid.persistence.qakka.core.QakkaUtils;
+import org.apache.usergrid.persistence.qakka.distributed.messages.QueueTimeoutRequest;
+import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
+import org.apache.usergrid.persistence.qakka.serialization.MultiShardMessageIterator;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+import java.util.UUID;
+
+
+public class QueueTimeouter extends UntypedActor {
+    private static final Logger logger = LoggerFactory.getLogger( QueueTimeouter.class );
+
+    private final String                    queueName;
+
+    private final QueueMessageSerialization messageSerialization;
+    private final MetricsService            metricsService;
+    private final ActorSystemFig            actorSystemFig;
+    private final QakkaFig qakkaFig;
+    private final CassandraClient           cassandraClient;
+
+
+    public QueueTimeouter(String queueName ) {
+        this.queueName = queueName;
+
+        Injector injector = App.INJECTOR;
+
+        messageSerialization = injector.getInstance( QueueMessageSerialization.class );
+        actorSystemFig       = injector.getInstance( ActorSystemFig.class );
+        qakkaFig             = injector.getInstance( QakkaFig.class );
+        metricsService       = injector.getInstance( MetricsService.class );
+        cassandraClient      = injector.getInstance( CassandraClientImpl.class );
+    }
+
+
+    @Override
+    public void onReceive(Object message) {
+
+        if ( message instanceof QueueTimeoutRequest) {
+
+            Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.TIMEOUT_TIME).time();
+
+            try {
+
+                QueueTimeoutRequest request = (QueueTimeoutRequest) message;
+
+                if (!request.getQueueName().equals( queueName )) {
+                    throw new QakkaRuntimeException(
+                            "QueueTimeouter for " + queueName + ": Incorrect queueName " + request.getQueueName() );
+                }
+
+                //logger.debug("Processing timeouts for queue {} ", queueName );
+
+                int count = 0;
+                String region = actorSystemFig.getRegionLocal();
+
+                ShardIterator shardIterator = new ShardIterator(
+                        cassandraClient, queueName, region, Shard.Type.INFLIGHT, Optional.empty());
+
+                MultiShardMessageIterator multiShardIteratorInflight = new MultiShardMessageIterator(
+                        cassandraClient, queueName, region, DatabaseQueueMessage.Type.INFLIGHT, shardIterator, null);
+
+                while ( multiShardIteratorInflight.hasNext() ) {
+
+                    DatabaseQueueMessage queueMessage = multiShardIteratorInflight.next();
+
+                    long currentTime = System.currentTimeMillis();
+
+                    if ((currentTime - queueMessage.getInflightAt()) > qakkaFig.getQueueTimeoutSeconds() * 1000) {
+
+                        // put message back in messages_available table as new queue message with new UUID
+
+                        UUID newQueueMessageId = QakkaUtils.getTimeUuid();
+
+                        DatabaseQueueMessage newMessage = new DatabaseQueueMessage(
+                                queueMessage.getMessageId(),
+                                DatabaseQueueMessage.Type.DEFAULT,
+                                queueMessage.getQueueName(),
+                                queueMessage.getRegion(),
+                                null,
+                                queueMessage.getQueuedAt(),
+                                queueMessage.getInflightAt(),
+                                newQueueMessageId );
+
+                        messageSerialization.writeMessage( newMessage );
+
+                        // remove message from inflight table
+
+                        messageSerialization.deleteMessage(
+                                queueName,
+                                actorSystemFig.getRegionLocal(),
+                                null,
+                                DatabaseQueueMessage.Type.INFLIGHT,
+                                queueMessage.getQueueMessageId() );
+
+                        count++;
+                    }
+                }
+
+                if (count > 0) {
+                    logger.debug( "Timed out {} messages for queue {}", count, queueName );
+                }
+
+            } finally {
+                timer.close();
+            }
+
+        } else {
+            unhandled( message );
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
new file mode 100644
index 0000000..6c91eb0
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.distributed.actors;
+
+import akka.actor.UntypedActor;
+import com.codahale.metrics.Timer;
+import com.google.inject.Injector;
+import org.apache.usergrid.persistence.qakka.App;
+import org.apache.usergrid.persistence.qakka.MetricsService;
+import org.apache.usergrid.persistence.qakka.core.QakkaUtils;
+import org.apache.usergrid.persistence.qakka.distributed.messages.QueueWriteRequest;
+import org.apache.usergrid.persistence.qakka.distributed.messages.QueueWriteResponse;
+import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLog;
+import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLogSerialization;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
+import org.apache.usergrid.persistence.qakka.serialization.transferlog.TransferLogSerialization;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.UUID;
+
+
+public class QueueWriter extends UntypedActor {
+    private static final Logger logger = LoggerFactory.getLogger( QueueWriter.class );
+
+    public enum WriteStatus { SUCCESS_XFERLOG_DELETED, SUCCESS_XFERLOG_NOTDELETED, ERROR };
+
+    private final QueueMessageSerialization messageSerialization;
+    private final TransferLogSerialization  transferLogSerialization;
+    private final AuditLogSerialization     auditLogSerialization;
+    private final MetricsService            metricsService;
+
+
+    public QueueWriter() {
+
+        Injector injector = App.INJECTOR;
+
+        messageSerialization     = injector.getInstance( QueueMessageSerialization.class );
+        transferLogSerialization = injector.getInstance( TransferLogSerialization.class );
+        auditLogSerialization    = injector.getInstance( AuditLogSerialization.class );
+        metricsService           = injector.getInstance( MetricsService.class );
+    }
+
+    @Override
+    public void onReceive(Object message) {
+
+            if (message instanceof QueueWriteRequest) {
+
+                Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.SEND_TIME_WRITE ).time();
+
+                try {
+                    QueueWriteRequest qa = (QueueWriteRequest) message;
+
+                    UUID queueMessageId = QakkaUtils.getTimeUuid();
+
+                    // TODO: implement deliveryTime and expirationTime
+
+                    DatabaseQueueMessage dbqm = null;
+                    long currentTime = System.currentTimeMillis();
+
+                    try {
+                        dbqm = new DatabaseQueueMessage(
+                                qa.getMessageId(),
+                                DatabaseQueueMessage.Type.DEFAULT,
+                                qa.getQueueName(),
+                                qa.getDestRegion(),
+                                null,
+                                currentTime,
+                                currentTime,
+                                queueMessageId );
+
+                        messageSerialization.writeMessage( dbqm );
+
+                        //logger.debug("Wrote queue message id {} to queue name {}",
+                        //        dbqm.getQueueMessageId(), dbqm.getQueueName());
+
+                    } catch (Throwable t) {
+                        logger.debug("Error creating database queue message", t);
+
+                        auditLogSerialization.recordAuditLog(
+                                AuditLog.Action.SEND,
+                                AuditLog.Status.ERROR,
+                                qa.getQueueName(),
+                                qa.getDestRegion(),
+                                qa.getMessageId(),
+                                dbqm.getMessageId() );
+
+                        getSender().tell( new QueueWriteResponse(
+                                QueueWriter.WriteStatus.ERROR ), getSender() );
+
+                        return;
+                    }
+
+                    auditLogSerialization.recordAuditLog(
+                            AuditLog.Action.SEND,
+                            AuditLog.Status.SUCCESS,
+                            qa.getQueueName(),
+                            qa.getDestRegion(),
+                            qa.getMessageId(),
+                            dbqm.getQueueMessageId() );
+
+                    try {
+                        transferLogSerialization.removeTransferLog(
+                                qa.getQueueName(),
+                                qa.getSourceRegion(),
+                                qa.getDestRegion(),
+                                qa.getMessageId() );
+
+                        getSender().tell( new QueueWriteResponse(
+                                QueueWriter.WriteStatus.SUCCESS_XFERLOG_DELETED ), getSender() );
+
+                    } catch (Throwable e) {
+                        logger.error("Error deleting transferlog", e);
+                        logger.debug( "Unable to delete transfer log for {} {} {} {}",
+                                qa.getQueueName(),
+                                qa.getSourceRegion(),
+                                qa.getDestRegion(),
+                                qa.getMessageId() );
+
+                        getSender().tell( new QueueWriteResponse(
+                                QueueWriter.WriteStatus.SUCCESS_XFERLOG_NOTDELETED ), getSender() );
+                    }
+
+                } finally {
+                    timer.close();
+                }
+
+            } else {
+                unhandled( message );
+            }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java
new file mode 100644
index 0000000..9cf06d9
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.distributed.actors;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import akka.routing.FromConfig;
+import org.apache.usergrid.persistence.qakka.distributed.messages.QueueWriteRequest;
+
+
+/**
+ * Route messages to QueueWriters
+ */
+public class QueueWriterRouter extends UntypedActor {
+
+    private final ActorRef router;
+
+
+    public QueueWriterRouter() {
+
+        router = getContext().actorOf(
+                FromConfig.getInstance().props(Props.create(QueueWriter.class )), "router");
+    }
+
+    @Override
+    public void onReceive(Object message) {
+
+        if ( message instanceof QueueWriteRequest) {
+            router.tell( message, getSender() );
+
+        } else {
+            unhandled(message);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java
new file mode 100644
index 0000000..46e4906
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.distributed.actors;
+
+
+import akka.actor.UntypedActor;
+import com.codahale.metrics.Timer;
+import com.datastax.driver.core.utils.UUIDs;
+import com.google.inject.Injector;
+import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
+import org.apache.usergrid.persistence.qakka.App;
+import org.apache.usergrid.persistence.qakka.MetricsService;
+import org.apache.usergrid.persistence.qakka.QakkaFig;
+import org.apache.usergrid.persistence.qakka.core.CassandraClient;
+import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
+import org.apache.usergrid.persistence.qakka.distributed.messages.ShardCheckRequest;
+import org.apache.usergrid.persistence.qakka.exceptions.NotFoundException;
+import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardCounterSerialization;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardIterator;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardSerialization;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.impl.ShardCounterSerializationImpl;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.impl.ShardSerializationImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+import java.util.UUID;
+
+
+public class ShardAllocator extends UntypedActor {
+    private static final Logger logger = LoggerFactory.getLogger( ShardAllocator.class );
+
+    private final String queueName;
+
+    private final QakkaFig qakkaFig;
+    private final ActorSystemFig            actorSystemFig;
+    private final ShardSerialization        shardSerialization;
+    private final ShardCounterSerialization shardCounterSerialization;
+    private final MetricsService            metricsService;
+    private final CassandraClient           cassandraClient;
+
+
+    public ShardAllocator( String queueName ) {
+        this.queueName = queueName;
+
+        Injector injector = App.INJECTOR;
+
+        this.qakkaFig                  = injector.getInstance( QakkaFig.class );
+        this.shardCounterSerialization = injector.getInstance( ShardCounterSerializationImpl.class );
+        this.shardSerialization        = injector.getInstance( ShardSerializationImpl.class );
+        this.actorSystemFig            = injector.getInstance( ActorSystemFig.class );
+        this.metricsService            = injector.getInstance( MetricsService.class );
+        this.cassandraClient           = injector.getInstance( CassandraClientImpl.class );
+
+        logger.debug( "Created shard allocator for queue {}", queueName );
+    }
+
+
+    @Override
+    public void onReceive( Object message ) throws Exception {
+
+        if ( message instanceof ShardCheckRequest) {
+
+            ShardCheckRequest request = (ShardCheckRequest) message;
+
+            if (!request.getQueueName().equals( queueName )) {
+                throw new QakkaRuntimeException(
+                        "ShardAllocator for " + queueName + ": Incorrect queueName " + request.getQueueName() );
+            }
+
+            // check both types of shard
+            checkLatestShard( Shard.Type.DEFAULT );
+            checkLatestShard( Shard.Type.INFLIGHT );
+
+        } else {
+            unhandled( message );
+        }
+
+    }
+
+    private void checkLatestShard( Shard.Type type ) {
+
+        Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.ALLOCATE_TIME).time();
+
+        try {
+
+            String region = actorSystemFig.getRegionLocal();
+
+            // find newest shard
+
+            ShardIterator shardIterator = new ShardIterator(
+                    cassandraClient, queueName, region, type, Optional.empty() );
+
+            Shard shard = null;
+            while (shardIterator.hasNext()) {
+                shard = shardIterator.next();
+            }
+
+            if (shard == null) {
+                logger.warn( "No shard found for {}, {}, {}", queueName, region, type );
+                return;
+            }
+
+            // if its count is greater than 90% of max shard size, then allocate a new shard
+
+            long counterValue = 0;
+            try {
+                counterValue = shardCounterSerialization.getCounterValue( queueName, type, shard.getShardId() );
+            } catch ( NotFoundException ignored ) {}
+
+            if (counterValue > (0.9 * qakkaFig.getMaxShardSize())) {
+
+                // Create UUID from a UNIX timestamp via DataStax utility
+                // https://docs.datastax.com/en/drivers/java/2.0/com/datastax/driver/core/utils/UUIDs.html
+                UUID futureUUID = UUIDs.startOf(
+                        System.currentTimeMillis() + qakkaFig.getShardAllocationAdvanceTimeMillis());
+
+                Shard newShard = new Shard( queueName, region, type, shard.getShardId() + 1, futureUUID );
+                shardSerialization.createShard( newShard );
+                shardCounterSerialization.incrementCounter( queueName, type, newShard.getShardId(), 0 );
+
+                logger.info("{} Created new shard for queue {} shardId {} timestamp {} counterValue {}",
+                        this.hashCode(), queueName, shard.getShardId(), futureUUID.timestamp(), counterValue );
+            }
+
+        } catch ( Throwable t ) {
+            logger.error("Error while checking shard allocations", t);
+
+        } finally {
+            timer.close();
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
new file mode 100644
index 0000000..9551c61
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.distributed.impl;
+
+import akka.actor.ActorRef;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.usergrid.persistence.actorsystem.ActorSystemManager;
+import org.apache.usergrid.persistence.qakka.QakkaFig;
+import org.apache.usergrid.persistence.qakka.core.QueueManager;
+import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
+import org.apache.usergrid.persistence.qakka.distributed.messages.*;
+import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+
+@Singleton
+public class DistributedQueueServiceImpl implements DistributedQueueService {
+
+    private static final Logger logger = LoggerFactory.getLogger( DistributedQueueServiceImpl.class );
+
+    private final ActorSystemManager actorSystemManager;
+    private final QueueManager queueManager;
+    private final QakkaFig qakkaFig;
+
+
+    @Inject
+    public DistributedQueueServiceImpl(
+            ActorSystemManager actorSystemManager,
+            QueueManager queueManager,
+            QakkaFig qakkaFig,
+            QueueActorRouterProducer  queueActorRouterProducer,
+            QueueWriterRouterProducer queueWriterRouterProducer,
+            QueueSenderRouterProducer queueSenderRouterProducer ) {
+
+        this.actorSystemManager = actorSystemManager;
+        this.queueManager = queueManager;
+        this.qakkaFig = qakkaFig;
+
+        actorSystemManager.registerRouterProducer( queueActorRouterProducer );
+        actorSystemManager.registerRouterProducer( queueWriterRouterProducer );
+        actorSystemManager.registerRouterProducer( queueSenderRouterProducer );
+    }
+
+
+    @Override
+    public void init() {
+        for ( String queueName : queueManager.getListOfQueues() ) {
+            initQueue( queueName );
+        }
+    }
+
+
+    @Override
+    public void initQueue(String queueName) {
+        QueueInitRequest request = new QueueInitRequest( queueName );
+        ActorRef clientActor = actorSystemManager.getClientActor();
+        clientActor.tell( request, null );
+    }
+
+
+    @Override
+    public void refresh() {
+        for ( String queueName : queueManager.getListOfQueues() ) {
+            refreshQueue( queueName );
+        }
+    }
+
+
+    @Override
+    public void refreshQueue(String queueName) {
+        QueueRefreshRequest request = new QueueRefreshRequest( queueName );
+        ActorRef clientActor = actorSystemManager.getClientActor();
+        clientActor.tell( request, null );
+    }
+
+
+    @Override
+    public void processTimeouts() {
+
+        for ( String queueName : queueManager.getListOfQueues() ) {
+
+            QueueTimeoutRequest request = new QueueTimeoutRequest( queueName );
+
+            ActorRef clientActor = actorSystemManager.getClientActor();
+            clientActor.tell( request, null );
+        }
+    }
+
+
+    @Override
+    public DistributedQueueService.Status sendMessageToRegion(
+            String queueName, String sourceRegion, String destRegion, UUID messageId,
+            Long deliveryTime, Long expirationTime ) {
+
+        List<String> queueNames = queueManager.getListOfQueues();
+        if ( !queueNames.contains( queueName ) ) {
+            throw new QakkaRuntimeException( "Queue name: " + queueName + " does not exist");
+        }
+
+        int maxRetries = qakkaFig.getMaxSendRetries();
+        int retries = 0;
+
+        QueueSendRequest request = new QueueSendRequest(
+                queueName, sourceRegion, destRegion, messageId, deliveryTime, expirationTime );
+
+        while ( retries++ < maxRetries ) {
+            try {
+                Timeout t = new Timeout( qakkaFig.getSendTimeoutSeconds(), TimeUnit.SECONDS );
+
+                // send to current region via local clientActor
+                ActorRef clientActor = actorSystemManager.getClientActor();
+                Future<Object> fut = Patterns.ask( clientActor, request, t );
+
+                // wait for response...
+                final Object response = Await.result( fut, t.duration() );
+
+                if ( response != null && response instanceof QueueSendResponse) {
+                    QueueSendResponse qarm = (QueueSendResponse)response;
+
+                    if ( !DistributedQueueService.Status.ERROR.equals( qarm.getSendStatus() )) {
+
+                        if ( retries > 1 ) {
+                            logger.debug("SUCCESS after {} retries", retries );
+                        }
+                        return qarm.getSendStatus();
+
+                    } else {
+                        logger.debug("ERROR STATUS sending to queue, retrying {}", retries );
+                    }
+
+                } else if ( response != null  ) {
+                    logger.debug("NULL RESPONSE sending to queue, retrying {}", retries );
+
+                } else {
+                    logger.debug("TIMEOUT sending to queue, retrying {}", retries );
+                }
+
+            } catch ( Exception e ) {
+                logger.debug("ERROR sending to queue, retrying " + retries, e );
+            }
+        }
+
+        throw new QakkaRuntimeException( "Error sending to queue after " + retries );
+    }
+
+
+    @Override
+    public Collection<DatabaseQueueMessage> getNextMessages( String queueName, int count ) {
+
+        List<String> queueNames = queueManager.getListOfQueues();
+        if ( !queueNames.contains( queueName ) ) {
+            throw new QakkaRuntimeException( "Queue name: " + queueName + " does not exist");
+        }
+
+        int maxRetries = qakkaFig.getMaxGetRetries();
+        int retries = 0;
+
+        QueueGetRequest request = new QueueGetRequest( queueName, count );
+        while ( retries++ < maxRetries ) {
+            try {
+                Timeout t = new Timeout( qakkaFig.getGetTimeoutSeconds(), TimeUnit.SECONDS );
+
+                // ask ClientActor and wait (up to timeout) for response
+
+                Future<Object> fut = Patterns.ask( actorSystemManager.getClientActor(), request, t );
+                final QakkaMessage response = (QakkaMessage)Await.result( fut, t.duration() );
+
+                if ( response != null && response instanceof QueueGetResponse) {
+                    QueueGetResponse qprm = (QueueGetResponse)response;
+                    if ( qprm.isSuccess() ) {
+                        if (retries > 1) {
+                            logger.debug( "getNextMessage SUCCESS after {} retries", retries );
+                        }
+                    }
+                    return qprm.getQueueMessages();
+
+
+                } else if ( response != null  ) {
+                    logger.debug("ERROR RESPONSE popping queue, retrying {}", retries );
+
+                } else {
+                    logger.debug("TIMEOUT popping to queue, retrying {}", retries );
+                }
+
+            } catch ( Exception e ) {
+                logger.debug("ERROR popping to queue, retrying " + retries, e );
+            }
+        }
+
+        throw new QakkaRuntimeException(
+                "Error getting from queue " + queueName + " after " + retries );
+    }
+
+
+    @Override
+    public Status ackMessage(String queueName, UUID queueMessageId ) {
+
+        List<String> queueNames = queueManager.getListOfQueues();
+        if ( !queueNames.contains( queueName ) ) {
+            return Status.BAD_REQUEST;
+        }
+
+        QueueAckRequest message = new QueueAckRequest( queueName, queueMessageId );
+        return sendMessageToLocalQueueActors( message );
+    }
+
+
+    @Override
+    public Status requeueMessage(String queueName, UUID messageId) {
+
+        List<String> queueNames = queueManager.getListOfQueues();
+        if ( !queueNames.contains( queueName ) ) {
+            throw new QakkaRuntimeException( "Queue name: " + queueName + " does not exist");
+        }
+
+        QueueAckRequest message = new QueueAckRequest( queueName, messageId );
+        return sendMessageToLocalQueueActors( message );
+    }
+
+
+    @Override
+    public Status clearMessages(String queueName) {
+
+        List<String> queueNames = queueManager.getListOfQueues();
+        if ( !queueNames.contains( queueName ) ) {
+            throw new QakkaRuntimeException( "Queue name: " + queueName + " does not exist");
+        }
+
+        // TODO: implement clear queue
+        throw new UnsupportedOperationException();
+    }
+
+
+    private Status sendMessageToLocalQueueActors( QakkaMessage message ) {
+
+        int maxRetries = 5;
+        int retries = 0;
+
+        while ( retries++ < maxRetries ) {
+            try {
+                Timeout t = new Timeout( 1, TimeUnit.SECONDS );
+
+                // ask ClientActor and wait (up to timeout) for response
+
+                Future<Object> fut = Patterns.ask( actorSystemManager.getClientActor(), message, t );
+                final QakkaMessage response = (QakkaMessage)Await.result( fut, t.duration() );
+
+                if ( response != null && response instanceof QueueAckResponse) {
+                    QueueAckResponse qprm = (QueueAckResponse)response;
+                    return qprm.getStatus();
+
+                } else if ( response != null  ) {
+                    logger.debug("ERROR RESPONSE sending message, retrying {}", retries );
+
+                } else {
+                    logger.debug("TIMEOUT sending message, retrying {}", retries );
+                }
+
+            } catch ( Exception e ) {
+                logger.debug("ERROR sending message, retrying " + retries, e );
+            }
+        }
+
+        throw new QakkaRuntimeException(
+                "Error sending message " + message + "after " + retries );
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueActorRouterProducer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueActorRouterProducer.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueActorRouterProducer.java
new file mode 100644
index 0000000..8c6adda
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueActorRouterProducer.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.distributed.impl;
+
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.cluster.singleton.ClusterSingletonManager;
+import akka.cluster.singleton.ClusterSingletonManagerSettings;
+import akka.cluster.singleton.ClusterSingletonProxy;
+import akka.cluster.singleton.ClusterSingletonProxySettings;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import org.apache.usergrid.persistence.actorsystem.ActorSystemManager;
+import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer;
+import org.apache.usergrid.persistence.actorsystem.RouterProducer;
+import org.apache.usergrid.persistence.qakka.QakkaFig;
+import org.apache.usergrid.persistence.qakka.distributed.actors.QueueActorRouter;
+import org.apache.usergrid.persistence.qakka.distributed.messages.*;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+
+public class QueueActorRouterProducer implements RouterProducer {
+
+    static Injector injector;
+    ActorSystemManager actorSystemManager;
+    QakkaFig qakkaFig;
+
+
+    @Inject
+    public QueueActorRouterProducer(
+            Injector injector,
+            ActorSystemManager actorSystemManager,
+            QakkaFig qakkaFig) {
+
+        this.injector = injector;
+        this.actorSystemManager = actorSystemManager;
+        this.qakkaFig = qakkaFig;
+    }
+
+
+    @Override
+    public String getRouterPath() {
+        return "/user/queueActorRouterProxy";
+    }
+
+
+    @Override
+    public void produceRouter(ActorSystem system, String role) {
+
+        ClusterSingletonManagerSettings settings =
+                ClusterSingletonManagerSettings.create( system ).withRole( "io" );
+
+        system.actorOf( ClusterSingletonManager.props(
+                Props.create( GuiceActorProducer.class, injector, QueueActorRouter.class ),
+                PoisonPill.getInstance(), settings ), "queueActorRouter" );
+
+        ClusterSingletonProxySettings proxySettings =
+                ClusterSingletonProxySettings.create( system ).withRole( role );
+
+        system.actorOf(
+                ClusterSingletonProxy.props( "/user/queueActorRouter", proxySettings ), "queueActorRouterProxy" );
+    }
+
+
+    @Override
+    public void addConfiguration(Map<String, Object> configMap) {
+
+        int numInstancesPerNode = qakkaFig.getNumQueueActors();
+
+        Map<String, Object> akka = (Map<String, Object>) configMap.get( "akka" );
+        final Map<String, Object> deploymentMap;
+
+        if ( akka.get( "actor" ) == null ) {
+            deploymentMap = new HashMap<>();
+            akka.put( "actor", new HashMap<String, Object>() {{
+                put( "deployment", deploymentMap );
+            }} );
+
+        } else if (((Map) akka.get( "actor" )).get( "deployment" ) == null) {
+            deploymentMap = new HashMap<>();
+            ((Map) akka.get( "actor" )).put( "deployment", deploymentMap );
+
+        } else {
+            deploymentMap = (Map<String, Object>) ((Map) akka.get( "actor" )).get( "deployment" );
+        }
+
+        deploymentMap.put( "/queueActorRouter/singleton/router", new HashMap<String, Object>() {{
+            put( "router", "consistent-hashing-pool" );
+            put( "cluster", new HashMap<String, Object>() {{
+                put( "enabled", "on" );
+                put( "allow-local-routees", "on" );
+                put( "use-role", "io" );
+                put( "max-nr-of-instances-per-node", numInstancesPerNode );
+                put( "failure-detector", new HashMap<String, Object>() {{
+                    put( "threshold", "10" );
+                    put( "acceptable-heartbeat-pause", "3 s" );
+                    put( "heartbeat-interval", "1 s" );
+                    put( "heartbeat-request", new HashMap<String, Object>() {{
+                        put( "expected-response-after", "3 s" );
+                    }} );
+                }} );
+            }} );
+        }} );
+
+    }
+
+
+    @Override
+    public Collection<Class> getMessageTypes() {
+        return new ArrayList() {{
+            add( QueueGetRequest.class );
+            add( QueueAckRequest.class );
+            add( QueueInitRequest.class );
+            add( QueueRefreshRequest.class );
+            add( QueueTimeoutRequest.class );
+            add( ShardCheckRequest.class );
+        }};
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueSenderRouterProducer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueSenderRouterProducer.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueSenderRouterProducer.java
new file mode 100644
index 0000000..885a559
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueSenderRouterProducer.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.distributed.impl;
+
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.cluster.singleton.ClusterSingletonManager;
+import akka.cluster.singleton.ClusterSingletonManagerSettings;
+import akka.cluster.singleton.ClusterSingletonProxy;
+import akka.cluster.singleton.ClusterSingletonProxySettings;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import org.apache.usergrid.persistence.actorsystem.ActorSystemManager;
+import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer;
+import org.apache.usergrid.persistence.actorsystem.RouterProducer;
+import org.apache.usergrid.persistence.qakka.QakkaFig;
+import org.apache.usergrid.persistence.qakka.distributed.actors.QueueSenderRouter;
+import org.apache.usergrid.persistence.qakka.distributed.messages.QueueSendRequest;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+
+public class QueueSenderRouterProducer implements RouterProducer {
+
+    static Injector injector;
+    ActorSystemManager actorSystemManager;
+    QakkaFig qakkaFig;
+
+
+    @Inject
+    public QueueSenderRouterProducer(
+            Injector injector,
+            ActorSystemManager actorSystemManager,
+            QakkaFig qakkaFig) {
+
+        this.injector = injector;
+        this.actorSystemManager = actorSystemManager;
+        this.qakkaFig = qakkaFig;
+    }
+
+
+    @Override
+    public String getRouterPath() {
+        return "/user/queueSenderRouterProxy";
+    }
+
+
+    @Override
+    public void produceRouter(ActorSystem system, String role) {
+
+        ClusterSingletonManagerSettings settings =
+                ClusterSingletonManagerSettings.create( system ).withRole( "io" );
+
+        system.actorOf( ClusterSingletonManager.props(
+                Props.create( GuiceActorProducer.class, injector, QueueSenderRouter.class ),
+                PoisonPill.getInstance(), settings ), "queueSenderRouter" );
+
+        ClusterSingletonProxySettings proxySettings =
+                ClusterSingletonProxySettings.create( system ).withRole( role );
+
+        system.actorOf(
+                ClusterSingletonProxy.props( "/user/queueSenderRouter", proxySettings ), "queueSenderRouterProxy" );
+    }
+
+
+    @Override
+    public void addConfiguration(Map<String, Object> configMap) {
+
+        int numInstancesPerNode = qakkaFig.getNumQueueSenderActors();
+
+        Map<String, Object> akka = (Map<String, Object>) configMap.get( "akka" );
+        final Map<String, Object> deploymentMap;
+
+        if ( akka.get( "actor" ) == null ) {
+            deploymentMap = new HashMap<>();
+            akka.put( "actor", new HashMap<String, Object>() {{
+                put( "deployment", deploymentMap );
+            }} );
+
+        } else if (((Map) akka.get( "actor" )).get( "deployment" ) == null) {
+            deploymentMap = new HashMap<>();
+            ((Map) akka.get( "actor" )).put( "deployment", deploymentMap );
+
+        } else {
+            deploymentMap = (Map<String, Object>) ((Map) akka.get( "actor" )).get( "deployment" );
+        }
+
+        deploymentMap.put( "/queueSenderRouter/singleton/router", new HashMap<String, Object>() {{
+            put( "router", "round-robin-pool" );
+            put( "cluster", new HashMap<String, Object>() {{
+                put( "enabled", "on" );
+                put( "allow-local-routees", "on" );
+                put( "use-role", "io" );
+                put( "max-nr-of-instances-per-node", numInstancesPerNode );
+                put( "failure-detector", new HashMap<String, Object>() {{
+                    put( "threshold", "10" );
+                    put( "acceptable-heartbeat-pause", "3 s" );
+                    put( "heartbeat-interval", "1 s" );
+                    put( "heartbeat-request", new HashMap<String, Object>() {{
+                        put( "expected-response-after", "3 s" );
+                    }} );
+                }} );
+            }} );
+        }} );
+
+    }
+
+    @Override
+    public Collection<Class> getMessageTypes() {
+        return Collections.singletonList( QueueSendRequest.class );
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueWriterRouterProducer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueWriterRouterProducer.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueWriterRouterProducer.java
new file mode 100644
index 0000000..c8b5567
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueWriterRouterProducer.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.distributed.impl;
+
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.cluster.singleton.ClusterSingletonManager;
+import akka.cluster.singleton.ClusterSingletonManagerSettings;
+import akka.cluster.singleton.ClusterSingletonProxy;
+import akka.cluster.singleton.ClusterSingletonProxySettings;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import org.apache.usergrid.persistence.actorsystem.ActorSystemManager;
+import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer;
+import org.apache.usergrid.persistence.actorsystem.RouterProducer;
+import org.apache.usergrid.persistence.qakka.QakkaFig;
+import org.apache.usergrid.persistence.qakka.distributed.actors.QueueWriterRouter;
+import org.apache.usergrid.persistence.qakka.distributed.messages.QueueWriteRequest;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+
+public class QueueWriterRouterProducer implements RouterProducer {
+
+    static Injector injector;
+    ActorSystemManager actorSystemManager;
+    QakkaFig qakkaFig;
+
+
+    @Inject
+    public QueueWriterRouterProducer(
+            Injector injector,
+            ActorSystemManager actorSystemManager,
+            QakkaFig qakkaFig) {
+
+        this.injector = injector;
+        this.actorSystemManager = actorSystemManager;
+        this.qakkaFig = qakkaFig;
+    }
+
+
+    @Override
+    public String getRouterPath() {
+        return "/user/queueWriterRouterProxy";
+    }
+
+
+    @Override
+    public void produceRouter(ActorSystem system, String role) {
+
+        ClusterSingletonManagerSettings settings =
+                ClusterSingletonManagerSettings.create( system ).withRole( "io" );
+
+        system.actorOf( ClusterSingletonManager.props(
+                Props.create( GuiceActorProducer.class, injector, QueueWriterRouter.class ),
+                PoisonPill.getInstance(), settings ), "queueWriterRouter" );
+
+        ClusterSingletonProxySettings proxySettings =
+                ClusterSingletonProxySettings.create( system ).withRole( role );
+
+        system.actorOf(
+                ClusterSingletonProxy.props( "/user/queueWriterRouter", proxySettings ), "queueWriterRouterProxy" );
+    }
+
+
+    @Override
+    public void addConfiguration(Map<String, Object> configMap) {
+
+        int numInstancesPerNode = qakkaFig.getNumQueueWriterActors();
+
+        Map<String, Object> akka = (Map<String, Object>) configMap.get( "akka" );
+        final Map<String, Object> deploymentMap;
+
+        if ( akka.get( "actor" ) == null ) {
+            deploymentMap = new HashMap<>();
+            akka.put( "actor", new HashMap<String, Object>() {{
+                put( "deployment", deploymentMap );
+            }} );
+
+        } else if (((Map) akka.get( "actor" )).get( "deployment" ) == null) {
+            deploymentMap = new HashMap<>();
+            ((Map) akka.get( "actor" )).put( "deployment", deploymentMap );
+
+        } else {
+            deploymentMap = (Map<String, Object>) ((Map) akka.get( "actor" )).get( "deployment" );
+        }
+
+        deploymentMap.put( "/queueWriterRouter/singleton/router", new HashMap<String, Object>() {{
+            put( "router", "round-robin-pool" );
+            put( "cluster", new HashMap<String, Object>() {{
+                put( "enabled", "on" );
+                put( "allow-local-routees", "on" );
+                put( "use-role", "io" );
+                put( "max-nr-of-instances-per-node", numInstancesPerNode );
+                put( "failure-detector", new HashMap<String, Object>() {{
+                    put( "threshold", "10" );
+                    put( "acceptable-heartbeat-pause", "3 s" );
+                    put( "heartbeat-interval", "1 s" );
+                    put( "heartbeat-request", new HashMap<String, Object>() {{
+                        put( "expected-response-after", "3 s" );
+                    }} );
+                }} );
+            }} );
+        }} );
+
+    }
+
+    @Override
+    public Collection<Class> getMessageTypes() {
+        return Collections.singletonList( QueueWriteRequest.class );
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QakkaMessage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QakkaMessage.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QakkaMessage.java
new file mode 100644
index 0000000..a1bbf14
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QakkaMessage.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.distributed.messages;
+
+import java.io.Serializable;
+
+/**
+ * Marker interface
+ */
+public interface QakkaMessage extends Serializable {
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueAckRequest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueAckRequest.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueAckRequest.java
new file mode 100644
index 0000000..4beb46b
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueAckRequest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.distributed.messages;
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
+
+import java.util.UUID;
+
+
+public class QueueAckRequest implements QakkaMessage {
+    private final String queueName;
+    private final UUID queueMessageId;
+
+
+    public QueueAckRequest( String queueName, UUID queueMessageId ) {
+        this.queueName = queueName;
+        this.queueMessageId = queueMessageId;
+    }
+
+    public String getQueueName() {
+        return queueName;
+    }
+
+    public UUID getQueueMessageId() {
+        return queueMessageId;
+    }
+
+    public String toString() {
+        return new ToStringBuilder( this )
+                .append( "queueName", queueName )
+                .append( "queueMessageId", queueMessageId )
+                .toString();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueAckResponse.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueAckResponse.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueAckResponse.java
new file mode 100644
index 0000000..68e6213
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueAckResponse.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.distributed.messages;
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
+
+import java.util.UUID;
+
+
+public class QueueAckResponse implements QakkaMessage {
+    private final String queueName;
+    private final UUID messageId;
+    private final DistributedQueueService.Status status;
+
+    public QueueAckResponse( String queueName, UUID messageId, DistributedQueueService.Status status ) {
+        this.queueName = queueName;
+        this.messageId = messageId;
+        this.status = status;
+    }
+
+    public String getQueueName() {
+        return queueName;
+    }
+
+    public UUID getMessageId() {
+        return messageId;
+    }
+
+    public DistributedQueueService.Status getStatus() {
+        return status;
+    }
+
+    public String toString() {
+        return new ToStringBuilder( this )
+                .append( "queueName", queueName )
+                .append( "messageId", messageId )
+                .append( "status", status )
+                .toString();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueGetRequest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueGetRequest.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueGetRequest.java
new file mode 100644
index 0000000..c23dcf6
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueGetRequest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.distributed.messages;
+
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
+
+public class QueueGetRequest implements QakkaMessage {
+    private final String queueName;
+    private final int numRequested;
+
+    public QueueGetRequest(String queueName, int numRequested ) {
+        this.queueName = queueName;
+        this.numRequested = numRequested;
+    }
+
+    public String getQueueName() {
+        return queueName;
+    }
+
+    public int getNumRequested() {
+        return numRequested;
+    }
+
+    public String toString() {
+        return new ToStringBuilder( this )
+                .append( "queueName", queueName )
+                .append( "numRequested", numRequested )
+                .toString();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueGetResponse.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueGetResponse.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueGetResponse.java
new file mode 100644
index 0000000..c8004fb
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueGetResponse.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.distributed.messages;
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+
+import java.util.Collection;
+import java.util.Collections;
+
+
+public class QueueGetResponse implements QakkaMessage {
+    private final Collection<DatabaseQueueMessage> queueMessages;
+    private final DistributedQueueService.Status status;
+
+
+    public QueueGetResponse(DistributedQueueService.Status status ) {
+        this.status = status;
+        this.queueMessages = Collections.emptyList();
+    }
+
+    public QueueGetResponse(DistributedQueueService.Status status, Collection<DatabaseQueueMessage> queueMessages) {
+        this.status = status;
+        this.queueMessages = queueMessages;
+    }
+
+    public DistributedQueueService.Status getStatus() {
+        return status;
+    }
+
+    public boolean isSuccess() {
+        return status.equals( DistributedQueueService.Status.SUCCESS);
+    }
+
+    public Collection<DatabaseQueueMessage> getQueueMessages() {
+        return queueMessages;
+    }
+
+    public String toString() {
+        return new ToStringBuilder( this )
+                .append( "messageCount", queueMessages.size() )
+                .append( "status", status )
+                .toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueInitRequest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueInitRequest.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueInitRequest.java
new file mode 100644
index 0000000..10180cd
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueInitRequest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.distributed.messages;
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
+
+
+public class QueueInitRequest implements QakkaMessage {
+    private final String queueName;
+
+
+    public QueueInitRequest(String queueName ) {
+        this.queueName = queueName;
+    }
+
+    public String getQueueName() {
+        return queueName;
+    }
+
+    public String toString() {
+        return new ToStringBuilder( this )
+                .append( "queueName", queueName )
+                .toString();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueRefreshRequest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueRefreshRequest.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueRefreshRequest.java
new file mode 100644
index 0000000..a81a6fd
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueRefreshRequest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.distributed.messages;
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
+
+
+public class QueueRefreshRequest implements QakkaMessage {
+    private final String queueName;
+
+
+    public QueueRefreshRequest(String queueName ) {
+        this.queueName = queueName;
+    }
+
+    public String getQueueName() {
+        return queueName;
+    }
+
+    public String toString() {
+        return new ToStringBuilder( this )
+                .append( "queueName", queueName )
+                .toString();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueSendRequest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueSendRequest.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueSendRequest.java
new file mode 100644
index 0000000..8a655f4
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueSendRequest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.distributed.messages;
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
+
+import java.util.UUID;
+
+
+public class QueueSendRequest implements QakkaMessage {
+
+    private final String queueName;
+    private final String sourceRegion;
+    private final String destRegion;
+    private final UUID messageId;
+    private Long deliveryTime;
+    private Long expirationTime;
+
+
+    public QueueSendRequest(
+            String queueName, String sourceRegion, String destRegion, UUID messageId,
+            Long deliveryTime, Long expirationTime) {
+
+        this.queueName = queueName;
+        this.sourceRegion = sourceRegion;
+        this.destRegion = destRegion;
+        this.messageId = messageId;
+        this.deliveryTime = deliveryTime;
+        this.expirationTime = expirationTime;
+    }
+
+    public String getQueueName() {
+        return queueName;
+    }
+
+    public String getSourceRegion() {
+        return sourceRegion;
+    }
+
+    public String getDestRegion() {
+        return destRegion;
+    }
+
+    public UUID getMessageId() {
+        return messageId;
+    }
+
+    public Long getExpirationTime() {
+        return expirationTime;
+    }
+
+    public Long getDeliveryTime() {
+        return deliveryTime;
+    }
+
+    public String toString() {
+        return new ToStringBuilder( this )
+                .append( "queueName", queueName )
+                .append( "sourceRegion", sourceRegion )
+                .append( "destRegion", destRegion )
+                .append( "messageId", messageId )
+                .append( "expirationTime", expirationTime )
+                .append( "deliveryTime", deliveryTime )
+                .toString();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueSendResponse.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueSendResponse.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueSendResponse.java
new file mode 100644
index 0000000..0c295a0
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueSendResponse.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.distributed.messages;
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
+
+
+public class QueueSendResponse implements QakkaMessage {
+    private final DistributedQueueService.Status status;
+
+    public QueueSendResponse(DistributedQueueService.Status status) {
+        this.status = status;
+    }
+
+    public DistributedQueueService.Status getSendStatus() {
+        return status;
+    }
+
+    public String toString() {
+        return new ToStringBuilder( this )
+                .append( "status", status )
+                .toString();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueTimeoutRequest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueTimeoutRequest.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueTimeoutRequest.java
new file mode 100644
index 0000000..5358459
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueTimeoutRequest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.distributed.messages;
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
+
+
+public class QueueTimeoutRequest implements QakkaMessage {
+    private final String queueName;
+
+
+    public QueueTimeoutRequest(String queueName ) {
+        this.queueName = queueName;
+    }
+
+    public String getQueueName() {
+        return queueName;
+    }
+
+    public String toString() {
+        return new ToStringBuilder( this )
+                .append( "queueName", queueName )
+                .toString();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueWriteRequest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueWriteRequest.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueWriteRequest.java
new file mode 100644
index 0000000..c7411de
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueWriteRequest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.distributed.messages;
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
+
+import java.util.UUID;
+
+
+public class QueueWriteRequest implements QakkaMessage {
+
+    private final String queueName;
+    private final String sourceRegion;
+    private final String destRegion;
+    private final UUID messageId;
+    private Long deliveryTime;
+    private Long expirationTime;
+
+
+    public QueueWriteRequest(
+            String queueName, String sourceRegion, String destRegion, UUID messageId,
+            Long deliveryTime, Long expirationTime) {
+
+        this.queueName = queueName;
+        this.sourceRegion = sourceRegion;
+        this.destRegion = destRegion;
+        this.messageId = messageId;
+        this.deliveryTime = deliveryTime;
+        this.expirationTime = expirationTime;
+    }
+
+    public String getQueueName() {
+        return queueName;
+    }
+
+    public String getSourceRegion() {
+        return sourceRegion;
+    }
+
+    public String getDestRegion() {
+        return destRegion;
+    }
+
+    public UUID getMessageId() {
+        return messageId;
+    }
+
+    public Long getExpirationTime() {
+        return expirationTime;
+    }
+
+    public Long getDeliveryTime() {
+        return deliveryTime;
+    }
+
+    public String toString() {
+        return new ToStringBuilder( this )
+                .append( "queueName", queueName )
+                .append( "sourceRegion", sourceRegion )
+                .append( "destRegion", destRegion )
+                .append( "messageId", messageId )
+                .append( "expirationTime", expirationTime )
+                .append( "deliveryTime", deliveryTime )
+                .toString();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueWriteResponse.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueWriteResponse.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueWriteResponse.java
new file mode 100644
index 0000000..1eb513c
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueWriteResponse.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.distributed.messages;
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.usergrid.persistence.qakka.distributed.actors.QueueWriter;
+
+
+public class QueueWriteResponse implements QakkaMessage {
+    private final QueueWriter.WriteStatus status;
+
+    public QueueWriteResponse(QueueWriter.WriteStatus status) {
+        this.status = status;
+    }
+
+    public QueueWriter.WriteStatus getSendStatus() {
+        return status;
+    }
+
+    public String toString() {
+        return new ToStringBuilder( this )
+                .append( "status", status )
+                .toString();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/ShardCheckRequest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/ShardCheckRequest.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/ShardCheckRequest.java
new file mode 100644
index 0000000..78dbe13
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/ShardCheckRequest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.distributed.messages;
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
+
+
+public class ShardCheckRequest implements QakkaMessage {
+    private final String queueName;
+
+
+    public ShardCheckRequest(String queueName ) {
+        this.queueName = queueName;
+    }
+
+    public String getQueueName() {
+        return queueName;
+    }
+
+    public String toString() {
+        return new ToStringBuilder( this )
+                .append( "queueName", queueName )
+                .toString();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/exceptions/BadRequestException.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/exceptions/BadRequestException.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/exceptions/BadRequestException.java
new file mode 100644
index 0000000..500fb6a
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/exceptions/BadRequestException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.exceptions;
+
+
+public class BadRequestException extends QakkaRuntimeException {
+
+    public BadRequestException(String message) {
+        super( message );
+    }
+
+    public BadRequestException(String message, Throwable cause) {
+        super( message, cause );
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/exceptions/NotFoundException.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/exceptions/NotFoundException.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/exceptions/NotFoundException.java
new file mode 100644
index 0000000..5f76163
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/exceptions/NotFoundException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.exceptions;
+
+
+public class NotFoundException extends QakkaRuntimeException {
+
+    public NotFoundException(String message) {
+        super( message );
+    }
+
+    public NotFoundException(String message, Throwable cause) {
+        super( message, cause );
+    }
+}