You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2016/09/16 22:34:41 UTC

[12/25] usergrid git commit: Initial integration of Qakka into Usergrid Queue module, and implementation of Qakka-based LegacyQueueManager implementation.

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QakkaUtils.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QakkaUtils.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QakkaUtils.java
new file mode 100644
index 0000000..9fe5c6e
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QakkaUtils.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.core;
+
+import com.datastax.driver.core.utils.UUIDs;
+
+import java.util.UUID;
+
+public class QakkaUtils {
+
+    public static UUID getTimeUuid() {
+        return UUIDs.timeBased();
+    }
+
+    public static Boolean isTimeUuid(UUID uuid) {
+        return uuid.version() == 1;
+    }
+
+    public static Boolean isNullOrEmpty(String s) {
+        return (s == null || s.equals(""));
+    }
+
+    public static UUID getTimeUUID(long when) {
+        return UUIDs.startOf( when );
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/Queue.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/Queue.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/Queue.java
new file mode 100644
index 0000000..e3d2790
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/Queue.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.core;
+
+import org.apache.usergrid.persistence.qakka.serialization.queues.DatabaseQueue;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+@XmlRootElement
+public class Queue {
+
+    // will eventually control these via properties file
+    private static final Integer defaultRetryCount = 3;
+    private static final Integer defaultHandlingTimeoutSec = 30;
+    private static final String defaultDeadLetterQueueExtension = "_DLQ";
+
+    private String name;
+    private String queueType;
+    private String regions;
+    private String defaultDestinations;
+    private Long defaultDelayMs;
+    private Integer retryCount;
+    private Integer handlingTimeoutSec;
+    private String deadLetterQueue;
+
+    public Queue() {} // Jackson needs no-arg ctor
+
+    public Queue(String name, String queueType, String regions, String defaultDestinations, Long defaultDelayMs,
+                 Integer retryCount, Integer handlingTimeoutSec, String deadLetterQueue) {
+        this.name = name;
+        this.queueType = queueType;
+        this.regions = regions;
+        this.defaultDestinations = defaultDestinations;
+        this.defaultDelayMs = defaultDelayMs;
+        this.retryCount = retryCount;
+        this.handlingTimeoutSec = handlingTimeoutSec;
+        this.deadLetterQueue = deadLetterQueue;
+    }
+
+    public Queue(String name, String queueType, String regions, String defaultDestinations, Long defaultDelayMs) {
+        this(name, queueType, regions, defaultDestinations, defaultDelayMs, defaultRetryCount,
+                defaultHandlingTimeoutSec, name + defaultDeadLetterQueueExtension);
+    }
+
+    public Queue(String name, String queueType, String regions, String defaultDestinations) {
+        this(name, queueType, regions, defaultDestinations, 0L, defaultRetryCount,
+                defaultHandlingTimeoutSec, name + defaultDeadLetterQueueExtension);
+    }
+
+    public Queue(String name) {
+        this(name, QueueType.MULTIREGION, Regions.LOCAL, Regions.LOCAL, 0L, defaultRetryCount,
+                defaultHandlingTimeoutSec, name + defaultDeadLetterQueueExtension);
+    }
+
+    public Queue(DatabaseQueue databaseQueue) {
+        this(   databaseQueue.getName(),
+                QueueType.MULTIREGION,
+                databaseQueue.getRegions(),
+                databaseQueue.getDefaultDestinations(),
+                databaseQueue.getDefaultDelayMs(),
+                databaseQueue.getRetryCount(),
+                databaseQueue.getHandlingTimeoutSec(),
+                databaseQueue.getDeadLetterQueue());
+    }
+
+    public String getName() {
+        return name;
+    }
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public String getQueueType() {
+        return queueType;
+    }
+
+    public String getRegions() {
+        return regions;
+    }
+    public void setRegions(String regions) {
+        this.regions = regions;
+    }
+
+    public String getDefaultDestinations() {
+        return defaultDestinations;
+    }
+    public void setDefaultDestinations(String defaultDestinations) {
+        this.defaultDestinations = defaultDestinations;
+    }
+
+    public Long getDefaultDelayMs() {
+        return defaultDelayMs;
+    }
+    public void setDefaultDelayMs(Long defaultDelayMs) {
+        this.defaultDelayMs = defaultDelayMs;
+    }
+
+    public Integer getRetryCount() {
+        return retryCount;
+    }
+    public void setRetryCount(Integer retryCount) {
+        this.retryCount = retryCount;
+    }
+
+    public Integer getHandlingTimeoutSec() {
+        return handlingTimeoutSec;
+    }
+    public void setHandlingTimeoutSec(Integer handlingTimeoutSec) {
+        this.handlingTimeoutSec = handlingTimeoutSec;
+    }
+
+    public String getDeadLetterQueue() {
+        return deadLetterQueue;
+    }
+    public void setDeadLetterQueue(String deadLetterQueue) {
+        this.deadLetterQueue = deadLetterQueue;
+    }
+
+    public DatabaseQueue toDatabaseQueue() {
+        return new DatabaseQueue(
+                name,
+                regions,
+                defaultDestinations,
+                defaultDelayMs,
+                retryCount,
+                handlingTimeoutSec,
+                deadLetterQueue);
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueManager.java
new file mode 100644
index 0000000..478fa12
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueManager.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.core;
+
+import java.util.List;
+
+public interface QueueManager {
+
+    void createQueue(Queue queue);
+
+    void updateQueueConfig(Queue queue);
+
+    void deleteQueue(String queueName);
+
+    Queue getQueueConfig(String queueName);
+
+    List<String> getListOfQueues();
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueMessage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueMessage.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueMessage.java
new file mode 100644
index 0000000..e79a241
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueMessage.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.core;
+
+import javax.xml.bind.annotation.XmlRootElement;
+import java.io.Serializable;
+import java.util.UUID;
+
+@XmlRootElement
+public class QueueMessage implements Serializable {
+
+    private UUID queueMessageId;
+    private UUID messageId;
+
+    private String queueName;
+    private String sendingRegion;
+    private String receivingRegion;
+
+    private Long delayUntilDate;
+    private Long expirationDate;
+    private Long createDate;
+    private Long retries;
+
+    private Boolean dataReceived;
+
+    /** MIME content type of data */
+    private String contentType;
+
+    /** If contentType is application/json then data will be the JSON payload for this queue message */
+    private String data;
+
+    /** If contentType is not then href will be the URL where the payload may be fetched */
+    private String href;
+
+
+    public QueueMessage() {} // for Jackson
+
+    public QueueMessage(
+            UUID queueMessageId, String queueName, String sendingRegion, String receivingRegion, UUID messageId,
+            Long delayUntilDate, Long expirationDate, Long createDate, Long retries, Boolean dataReceived) {
+
+        if (queueMessageId == null) {
+            this.queueMessageId = QakkaUtils.getTimeUuid();
+        } else {
+            this.queueMessageId = queueMessageId;
+        }
+        this.queueName        = queueName;
+        this.sendingRegion    = sendingRegion;
+        this.receivingRegion  = receivingRegion;
+        this.messageId        = messageId;
+        this.delayUntilDate   = delayUntilDate;
+        this.expirationDate   = expirationDate;
+        this.createDate       = createDate;
+        this.retries          = retries;
+        this.dataReceived     = dataReceived;
+    }
+
+    public UUID getQueueMessageId() {
+        return queueMessageId;
+    }
+
+    public void setQueueMessageId(UUID queueMessageId) {
+        this.queueMessageId = queueMessageId;
+    }
+
+    public String getQueueName() {
+        return queueName;
+    }
+
+    public String getSendingRegion() {
+        return sendingRegion;
+    }
+
+    public String getReceivingRegion() {
+        return receivingRegion;
+    }
+
+    public UUID getMessageId() {
+        return messageId;
+    }
+
+    public Long getDelayUntilDate() {
+        return delayUntilDate;
+    }
+
+    public Long getDelayUntilMs() {
+        if ( delayUntilDate == null ) {
+            return null;
+        }
+        return delayUntilDate - System.currentTimeMillis();
+    }
+
+    public void setDelayUntilDate(Long delayUntilDate) {
+        this.delayUntilDate = delayUntilDate;
+    }
+
+    public void setDelayUntilMs(Long delayMs) {
+        this.delayUntilDate = System.currentTimeMillis() + delayMs;
+    }
+
+    public Long getExpirationDate() {
+        return expirationDate;
+    }
+
+    public Long getExpirationMs() {
+        if ( expirationDate == null ) {
+            return null;
+        }
+        return expirationDate - System.currentTimeMillis();
+    }
+
+    public void setExpirationDate(Long expirationDate) {
+        this.expirationDate = expirationDate;
+    }
+
+    public void setExpirationMs(Long expirationMs) {
+        this.expirationDate = System.currentTimeMillis() + expirationMs;
+    }
+
+    public Long getCreateDate() {
+        return createDate;
+    }
+
+    public void setCreateDate(Long createDate) {
+        this.createDate = createDate;
+    }
+
+    public Long getRetries() {
+        return retries;
+    }
+
+    public void setRetries(Long retries) {
+        this.retries = retries;
+    }
+
+    public Boolean getDataReceived() {
+        return dataReceived;
+    }
+
+    public void setDataReceived(Boolean dataReceived) {
+        this.dataReceived = dataReceived;
+    }
+
+
+    public String getData() {
+        return data;
+    }
+
+    public void setData(String data) {
+        this.data = data;
+    }
+
+    public String getContentType() {
+        return contentType;
+    }
+
+    public void setContentType(String contentType) {
+        this.contentType = contentType;
+    }
+
+    public String getHref() {
+        return href;
+    }
+
+    public void setHref(String href) {
+        this.href = href;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManager.java
new file mode 100644
index 0000000..15203d8
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManager.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.core;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.UUID;
+
+
+public interface QueueMessageManager {
+
+    /**
+     * Send Queue Message to one or more destination regions.
+     *  @param queueName Name of queue
+     * @param destinationRegions List of destination regions
+     * @param delayMs Delay before sending queue message
+     * @param expirationSecs Time before message expires
+     * @param contentType Content type of message data
+     * @param messageData Message content
+     */
+    void sendMessages(String queueName, List<String> destinationRegions,
+                      Long delayMs, Long expirationSecs, String contentType, ByteBuffer messageData);
+
+    /**
+     * Get next available messages from the specified queue.
+     *
+     * @param queueName Name of queue
+     * @param count Number of messages to get
+     * @return List of next messages, empty if non-available
+     */
+    List<QueueMessage> getNextMessages(String queueName, int count);
+
+    /**
+     * Acknowledge that message has been received and is no longer inflight.
+     *
+     * @param queueName Name of queue
+     * @param queueMessageId ID of queue message
+     */
+    void ackMessage(String queueName, UUID queueMessageId);
+
+    /**
+     * Put message back in the queue.
+     *
+     * @param queueName Name of the queue
+     * @param messageId ID of the queue message
+     * @param delayMs Delay before re-queueing message
+     */
+    void requeueMessage(String queueName, UUID messageId, Long delayMs);
+
+    /**
+     * Clear all messages from queue
+     *
+     * @param queueName Name of queue
+     */
+    void clearMessages(String queueName);
+
+    /**
+     * Get message payload data.
+     */
+    ByteBuffer getMessageData(UUID messageId);
+
+    /**
+     * Get message from messages available or messages inflight storage.
+     */
+    QueueMessage getMessage(String queueName, UUID queueMessageId);
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueType.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueType.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueType.java
new file mode 100644
index 0000000..f0f59e5
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueType.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.core;
+
+
+public class QueueType {
+    public static final String LOCAL = "_LOCAL";
+    public static final String MULTIREGION = "_MULTIREGION";
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/Regions.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/Regions.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/Regions.java
new file mode 100644
index 0000000..3097e92
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/Regions.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.core;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+
+@Singleton
+public class Regions {
+    public static final String LOCAL = "LOCAL";
+    public static final String ALL = "ALL";
+    public static final String REMOTE = "REMOTE";
+
+    // load regions from properties
+    String localRegion;
+    List<String> regionList;
+
+
+    @Inject
+    public Regions( ActorSystemFig actorSystemFig ) {
+        localRegion = actorSystemFig.getRegionLocal();
+        regionList = Arrays.asList( actorSystemFig.getRegionsList().split(","));
+    }
+
+
+    public List<String> getRegions(String region) {
+        List<String> ret = null;
+
+        switch (region) {
+            case ALL:
+                ret = new ArrayList<>(regionList);
+                break;
+            case LOCAL:
+                ret = Collections.singletonList(localRegion);
+                break;
+            case REMOTE:
+                ret = new ArrayList<>(regionList);
+                ret.remove(localRegion);
+                break;
+            default:
+                // parse regions into list -- assume a single region now, but can do region1,region2
+
+                // validate regions
+
+                ret = Collections.singletonList(region);
+                break;
+        }
+
+        return ret;
+    }
+
+    public String getLocalRegion() {
+        return localRegion;
+    }
+
+    public Boolean isValidRegion(String region) {
+        return regionList.contains(region);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
new file mode 100644
index 0000000..474ef5c
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.core.impl;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.usergrid.persistence.qakka.QakkaFig;
+import org.apache.usergrid.persistence.qakka.distributed.actors.QueueRefresher;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+
+@Singleton
+public class InMemoryQueue {
+    private static final Logger logger = LoggerFactory.getLogger( QueueRefresher.class );
+
+    private final Map<String, Queue<DatabaseQueueMessage>> queuesByName;
+    private final Map<String, UUID> newestByQueueName;
+
+
+    @Inject
+    InMemoryQueue(QakkaFig qakkaFig) {
+        queuesByName = new HashMap<>( qakkaFig.getQueueInMemorySize() );
+        newestByQueueName = new HashMap<>( qakkaFig.getQueueInMemorySize() );
+    }
+
+    private Queue<DatabaseQueueMessage> getQueue( String queueName ) {
+        synchronized ( queuesByName ) {
+            if ( !queuesByName.containsKey( queueName )) {
+                queuesByName.put( queueName, new ConcurrentLinkedQueue<>() );
+            }
+            return queuesByName.get( queueName );
+        }
+    }
+
+    public void add( String queueName, DatabaseQueueMessage databaseQueueMessage ) {
+        UUID newest = newestByQueueName.get( queueName );
+        if ( newest == null ) {
+            newest = databaseQueueMessage.getQueueMessageId();
+        } else {
+            if ( databaseQueueMessage.getQueueMessageId().compareTo( newest ) > 0 ) {
+                newest = databaseQueueMessage.getQueueMessageId();
+            }
+        }
+        newestByQueueName.put( queueName, newest );
+        getQueue( queueName ).add( databaseQueueMessage );
+    }
+
+    public UUID getNewest( String queueName ) {
+        return newestByQueueName.get( queueName );
+    }
+
+    public DatabaseQueueMessage poll( String queueName ) {
+        return getQueue( queueName ).poll();
+    }
+
+    public int size( String queueName ) {
+        return getQueue( queueName ).size();
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java
new file mode 100644
index 0000000..bbb46a8
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.core.impl;
+
+import com.google.inject.Inject;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
+import org.apache.usergrid.persistence.qakka.core.QakkaUtils;
+import org.apache.usergrid.persistence.qakka.core.Queue;
+import org.apache.usergrid.persistence.qakka.core.QueueManager;
+import org.apache.usergrid.persistence.qakka.core.Regions;
+import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
+import org.apache.usergrid.persistence.qakka.serialization.queues.DatabaseQueue;
+import org.apache.usergrid.persistence.qakka.serialization.queues.QueueSerialization;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardSerialization;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+public class QueueManagerImpl implements QueueManager {
+    private final ActorSystemFig          actorSystemFig;
+    private final QueueSerialization queueSerialization;
+    private final DistributedQueueService distributedQueueService;
+    private final ShardSerialization      shardSerialization;
+
+
+    @Inject
+    public QueueManagerImpl(
+            ActorSystemFig          actorSystemFig,
+            QueueSerialization      queueSerialization,
+            DistributedQueueService distributedQueueService,
+            ShardSerialization      shardSerialization ) {
+
+        this.actorSystemFig          = actorSystemFig;
+        this.queueSerialization      = queueSerialization;
+        this.distributedQueueService = distributedQueueService;
+        this.shardSerialization      = shardSerialization;
+    }
+
+    @Override
+    public void createQueue(Queue queue) {
+
+        queueSerialization.writeQueue(queue.toDatabaseQueue());
+
+        List<String> regions = new ArrayList<>();
+
+        if ( Regions.LOCAL.equals( queue.getRegions() ) || StringUtils.isEmpty( queue.getRegions() ) ) {
+            regions.add( actorSystemFig.getRegionLocal() );
+
+        } else if ( Regions.ALL.equals( queue.getRegions() )) {
+            for ( String region : actorSystemFig.getRegionsList().split(",")) {
+                regions.add( region );
+            }
+
+        } else {
+            for (String region : queue.getRegions().split( "," )) {
+                regions.add( region );
+            }
+        }
+
+        for ( String region : regions ) {
+
+            Shard available = new Shard( queue.getName(), region, Shard.Type.DEFAULT, 1L, QakkaUtils.getTimeUuid());
+            shardSerialization.createShard( available );
+
+            Shard inflight = new Shard( queue.getName(), region, Shard.Type.INFLIGHT, 1L, QakkaUtils.getTimeUuid());
+            shardSerialization.createShard( inflight );
+        }
+
+        distributedQueueService.initQueue( queue.getName() );
+        distributedQueueService.refreshQueue( queue.getName() );
+    }
+
+    @Override
+    public void updateQueueConfig(Queue queue) {
+
+        queueSerialization.writeQueue(queue.toDatabaseQueue());
+
+        distributedQueueService.initQueue( queue.getName() );
+        distributedQueueService.refreshQueue( queue.getName() );
+    }
+
+    @Override
+    public void deleteQueue(String queueName) {
+
+        queueSerialization.deleteQueue(queueName);
+
+        // TODO: implement delete queue for Akka, stop schedulers, etc.
+        //qas.deleteQueue(queueName);
+    }
+
+    @Override
+    public Queue getQueueConfig(String queueName) {
+
+        DatabaseQueue databaseQueue = queueSerialization.getQueue(queueName);
+        if ( databaseQueue != null ) {
+            return new Queue( databaseQueue );
+        }
+        return null;
+    }
+
+    @Override
+    public List<String> getListOfQueues() {
+        return queueSerialization.getListOfQueues();
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java
new file mode 100644
index 0000000..bcd0f58
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.core.impl;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
+import org.apache.usergrid.persistence.qakka.api.URIStrategy;
+import org.apache.usergrid.persistence.qakka.core.QakkaUtils;
+import org.apache.usergrid.persistence.qakka.core.QueueManager;
+import org.apache.usergrid.persistence.qakka.core.QueueMessage;
+import org.apache.usergrid.persistence.qakka.core.QueueMessageManager;
+import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
+import org.apache.usergrid.persistence.qakka.exceptions.BadRequestException;
+import org.apache.usergrid.persistence.qakka.exceptions.NotFoundException;
+import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessageBody;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
+import org.apache.usergrid.persistence.qakka.serialization.transferlog.TransferLogSerialization;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+
+
+@Singleton
+public class QueueMessageManagerImpl implements QueueMessageManager {
+
+    private static final Logger logger = LoggerFactory.getLogger( QueueMessageManagerImpl.class );
+
+    private final ActorSystemFig            actorSystemFig;
+    private final QueueManager              queueManager;
+    private final QueueMessageSerialization queueMessageSerialization;
+    private final DistributedQueueService   distributedQueueService;
+    private final TransferLogSerialization  transferLogSerialization;
+    private final URIStrategy uriStrategy;
+
+
+    @Inject
+    public QueueMessageManagerImpl(
+            ActorSystemFig            actorSystemFig,
+            QueueManager              queueManager,
+            QueueMessageSerialization queueMessageSerialization,
+            DistributedQueueService   distributedQueueService,
+            TransferLogSerialization  transferLogSerialization,
+            URIStrategy               uriStrategy
+            ) {
+
+        this.actorSystemFig            = actorSystemFig;
+        this.queueManager              = queueManager;
+        this.queueMessageSerialization = queueMessageSerialization;
+        this.distributedQueueService   = distributedQueueService;
+        this.transferLogSerialization  = transferLogSerialization;
+        this.uriStrategy               = uriStrategy;
+    }
+
+
+    @Override
+    public void sendMessages(String queueName, List<String> destinationRegions,
+            Long delayMs, Long expirationSecs, String contentType, ByteBuffer messageData) {
+
+        // TODO: implement delay and expiration
+
+//        Preconditions.checkArgument(delayMs == null || delayMs > 0L,
+//                "Delay milliseconds must be greater than zero");
+//        Preconditions.checkArgument(expirationSecs == null || expirationSecs > 0L,
+//                "Expiration seconds must be greater than zero");
+
+        if ( queueManager.getQueueConfig( queueName ) == null ) {
+            throw new NotFoundException( "Queue not found: " + queueName );
+        }
+
+        // get current time
+        Long currentTimeMs = System.currentTimeMillis();
+
+        // create message id
+        UUID messageId = QakkaUtils.getTimeUuid();
+
+        Long deliveryTime = delayMs != null ? currentTimeMs + delayMs : null;
+        Long expirationTime = expirationSecs != null ? currentTimeMs + (1000 * expirationSecs) : null;
+
+        // write message data to C*
+        queueMessageSerialization.writeMessageData(
+                messageId, new DatabaseQueueMessageBody(messageData, contentType));
+
+        for (String region : destinationRegions) {
+
+            transferLogSerialization.recordTransferLog(
+                    queueName, actorSystemFig.getRegionLocal(), region, messageId );
+
+            // send message to destination region's queue
+            DistributedQueueService.Status status = null;
+            try {
+                status = distributedQueueService.sendMessageToRegion(
+                        queueName,
+                        actorSystemFig.getRegionLocal(),
+                        region,
+                        messageId,
+                        deliveryTime,
+                        expirationTime );
+
+                //logger.debug("Send message to queueName {} in region {}", queueName, region );
+
+            } catch ( QakkaRuntimeException qae ) {
+                logger.error("Error sending message " + messageId + " to " + region, qae);
+            }
+        }
+    }
+
+
+    @Override
+    public List<QueueMessage> getNextMessages(String queueName, int count) {
+
+        if ( queueManager.getQueueConfig( queueName ) == null ) {
+            throw new NotFoundException( "Queue not found: " + queueName );
+        }
+
+        Collection<DatabaseQueueMessage> dbMessages = distributedQueueService.getNextMessages( queueName, count );
+
+        List<QueueMessage> queueMessages = joinMessages( queueName, dbMessages );
+
+        if ( queueMessages.size() < count && queueMessages.size() < dbMessages.size() ) {
+            logger.debug("Messages failed to join for queue:{}, get more", queueName);
+
+            // some messages failed to join, get more
+            dbMessages = distributedQueueService.getNextMessages( queueName, count - queueMessages.size() );
+            queueMessages.addAll( joinMessages( queueName, dbMessages ) );
+        }
+
+        return queueMessages;
+    }
+
+
+    private List<QueueMessage> joinMessages( String queueName, Collection<DatabaseQueueMessage> dbMessages) {
+
+        List<QueueMessage> queueMessages = new ArrayList<>();
+
+        for (DatabaseQueueMessage dbMessage : dbMessages) {
+
+            DatabaseQueueMessageBody data = queueMessageSerialization.loadMessageData( dbMessage.getMessageId() );
+
+            if ( data != null ) {
+
+                QueueMessage queueMessage = new QueueMessage(
+                        dbMessage.getQueueMessageId(),
+                        queueName,
+                        null,                    // sending region
+                        dbMessage.getRegion(),   // receiving region
+                        dbMessage.getMessageId(),
+                        null,                    // delay until date
+                        null,                    // expiration date
+                        dbMessage.getQueuedAt(),
+                        null,                    // retries
+                        true );
+
+                queueMessage.setContentType( data.getContentType() );
+                if ( "application/json".equals( data.getContentType() )) {
+                    try {
+                        String json = new String( data.getBlob().array(), "UTF-8");
+                        queueMessage.setData( json );
+
+                    } catch (UnsupportedEncodingException e) {
+                        logger.error("Error unencoding data for messageId=" + queueMessage.getMessageId(), e);
+                    }
+                } else {
+                    try {
+                        queueMessage.setHref( uriStrategy.queueMessageDataURI(
+                                queueName, queueMessage.getQueueMessageId()).toString());
+
+                    } catch (URISyntaxException e) {
+                        throw new QakkaRuntimeException( "Error forming URI for message data", e );
+                    }
+                }
+
+                queueMessages.add( queueMessage );
+            }
+        }
+
+        return queueMessages;
+    }
+
+
+    @Override
+    public void ackMessage(String queueName, UUID queueMessageId) {
+
+        if ( queueManager.getQueueConfig( queueName ) == null ) {
+            throw new NotFoundException( "Queue not found: " + queueName );
+        }
+
+        DistributedQueueService.Status status = distributedQueueService.ackMessage( queueName, queueMessageId );
+
+        if ( DistributedQueueService.Status.BAD_REQUEST.equals( status )) {
+            throw new BadRequestException( "Message not inflight" );
+
+        } else if ( DistributedQueueService.Status.ERROR.equals( status )) {
+            throw new QakkaRuntimeException( "Unable to ack message due to error" );
+        }
+    }
+
+
+    @Override
+    public void requeueMessage(String queueName, UUID messageId, Long delayMs) {
+
+        if ( queueManager.getQueueConfig( queueName ) == null ) {
+            throw new NotFoundException( "Queue not found: " + queueName );
+        }
+
+        // TODO: implement requeueMessage
+
+        throw new UnsupportedOperationException( "requeueMessage not yet implemented" );
+    }
+
+
+    @Override
+    public void clearMessages(String queueName) {
+
+        if ( queueManager.getQueueConfig( queueName ) == null ) {
+            throw new NotFoundException( "Queue not found: " + queueName );
+        }
+
+        // TODO: implement clearMessages
+
+        throw new UnsupportedOperationException( "clearMessages not yet implemented" );
+    }
+
+
+    @Override
+    public ByteBuffer getMessageData( UUID messageId ) {
+        DatabaseQueueMessageBody body = queueMessageSerialization.loadMessageData( messageId );
+        return body != null ? body.getBlob() : null;
+    }
+
+
+    /**
+     * Get but do not put inflight specified queue message, first looking in INFLIGHT table then DEFAULT.
+     */
+    @Override
+    public QueueMessage getMessage( String queueName, UUID queueMessageId ) {
+
+        QueueMessage queueMessage = null;
+
+        // first look in INFLIGHT storage
+
+
+        DatabaseQueueMessage dbMessage = queueMessageSerialization.loadMessage(
+                queueName, actorSystemFig.getRegionLocal(), null,
+                DatabaseQueueMessage.Type.INFLIGHT, queueMessageId );
+
+        if ( dbMessage == null ) {
+
+            // not found, so now look in DEFAULT storage
+
+            dbMessage = queueMessageSerialization.loadMessage(
+                queueName, actorSystemFig.getRegionLocal(), null,
+                DatabaseQueueMessage.Type.DEFAULT, queueMessageId );
+        }
+
+        if ( dbMessage != null ) {
+            queueMessage = new QueueMessage(
+                    dbMessage.getQueueMessageId(),
+                    queueName,
+                    null,                    // sending region
+                    dbMessage.getRegion(),   // receiving region
+                    dbMessage.getMessageId(),
+                    null,                    // delay until date
+                    null,                    // expiration date
+                    dbMessage.getQueuedAt(),
+                    null,                    // retries
+                    true );
+        }
+
+        return queueMessage;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/DistributedQueueService.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/DistributedQueueService.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/DistributedQueueService.java
new file mode 100644
index 0000000..c2ca6b1
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/DistributedQueueService.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.distributed;
+
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+
+import java.util.Collection;
+import java.util.UUID;
+
+
+/**
+ * Interface to distributed part of Qakka queue implementation.
+ */
+public interface DistributedQueueService {
+
+    enum Status { SUCCESS, ERROR, BAD_REQUEST };
+
+    void init();
+
+    void initQueue(String queueName);
+
+    void refresh();
+
+    void refreshQueue(String queueName);
+
+    void processTimeouts();
+
+    Status sendMessageToRegion(
+        String queueName,
+        String sourceRegion,
+        String destRegion,
+        UUID messageId,
+        Long deliveryTime,
+        Long expirationTime);
+
+    Collection<DatabaseQueueMessage> getNextMessages(String queueName, int numMessages);
+
+    Status ackMessage(String queueName, UUID messageId);
+
+    Status requeueMessage(String queueName, UUID messageId);
+
+    Status clearMessages(String queueName);
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
new file mode 100644
index 0000000..6ecffba
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.distributed.actors;
+
+import akka.actor.ActorRef;
+import akka.actor.Cancellable;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import com.codahale.metrics.Timer;
+import com.google.inject.Injector;
+import org.apache.usergrid.persistence.qakka.App;
+import org.apache.usergrid.persistence.qakka.MetricsService;
+import org.apache.usergrid.persistence.qakka.QakkaFig;
+import org.apache.usergrid.persistence.qakka.core.impl.InMemoryQueue;
+import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
+import org.apache.usergrid.persistence.qakka.distributed.messages.*;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.Duration;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+
+public class QueueActor extends UntypedActor {
+    private static final Logger logger = LoggerFactory.getLogger( QueueActor.class );
+
+    private final QakkaFig qakkaFig;
+    private final InMemoryQueue inMemoryQueue;
+    private final QueueActorHelper queueActorHelper;
+    private final MetricsService   metricsService;
+
+    private final Map<String, Cancellable> refreshSchedulersByQueueName = new HashMap<>();
+    private final Map<String, Cancellable> timeoutSchedulersByQueueName = new HashMap<>();
+    private final Map<String, Cancellable> shardAllocationSchedulersByQueueName = new HashMap<>();
+
+    private final Map<String, ActorRef> queueReadersByQueueName    = new HashMap<>();
+    private final Map<String, ActorRef> queueTimeoutersByQueueName = new HashMap<>();
+    private final Map<String, ActorRef> shardAllocatorsByQueueName = new HashMap<>();
+
+
+    public QueueActor() {
+
+        Injector injector = App.INJECTOR;
+
+        qakkaFig         = injector.getInstance( QakkaFig.class );
+        inMemoryQueue    = injector.getInstance( InMemoryQueue.class );
+        queueActorHelper = injector.getInstance( QueueActorHelper.class );
+        metricsService   = injector.getInstance( MetricsService.class );
+    }
+
+    @Override
+    public void onReceive(Object message) {
+
+        if ( message instanceof QueueInitRequest) {
+            QueueInitRequest request = (QueueInitRequest)message;
+
+            if ( refreshSchedulersByQueueName.get( request.getQueueName() ) == null ) {
+                Cancellable scheduler = getContext().system().scheduler().schedule(
+                    Duration.create( 0, TimeUnit.MILLISECONDS),
+                    Duration.create( qakkaFig.getQueueRefreshMilliseconds(), TimeUnit.MILLISECONDS),
+                    self(),
+                    new QueueRefreshRequest( request.getQueueName() ),
+                    getContext().dispatcher(),
+                    getSelf());
+                refreshSchedulersByQueueName.put( request.getQueueName(), scheduler );
+            }
+
+            if ( timeoutSchedulersByQueueName.get( request.getQueueName() ) == null ) {
+                Cancellable scheduler = getContext().system().scheduler().schedule(
+                        Duration.create( 0, TimeUnit.MILLISECONDS),
+                        Duration.create( qakkaFig.getQueueTimeoutSeconds()/2, TimeUnit.SECONDS),
+                        self(),
+                        new QueueTimeoutRequest( request.getQueueName() ),
+                        getContext().dispatcher(),
+                        getSelf());
+                timeoutSchedulersByQueueName.put( request.getQueueName(), scheduler );
+            }
+
+            if ( shardAllocationSchedulersByQueueName.get( request.getQueueName() ) == null ) {
+                Cancellable scheduler = getContext().system().scheduler().schedule(
+                        Duration.create( 0, TimeUnit.MILLISECONDS),
+                        Duration.create( qakkaFig.getShardAllocationCheckFrequencyMillis(), TimeUnit.MILLISECONDS),
+                        self(),
+                        new ShardCheckRequest( request.getQueueName() ),
+                        getContext().dispatcher(),
+                        getSelf());
+                shardAllocationSchedulersByQueueName.put( request.getQueueName(), scheduler );
+            }
+
+        } else if ( message instanceof QueueRefreshRequest ) {
+            QueueRefreshRequest request = (QueueRefreshRequest)message;
+
+            if ( queueReadersByQueueName.get( request.getQueueName() ) == null ) {
+                ActorRef readerRef = getContext().actorOf( Props.create(
+                    QueueRefresher.class, request.getQueueName()), request.getQueueName() + "_reader");
+                queueReadersByQueueName.put( request.getQueueName(), readerRef );
+            }
+
+            // hand-off to queue's reader
+            queueReadersByQueueName.get( request.getQueueName() ).tell( request, self() );
+
+        } else if ( message instanceof QueueTimeoutRequest ) {
+            QueueTimeoutRequest request = (QueueTimeoutRequest)message;
+
+            if ( queueTimeoutersByQueueName.get( request.getQueueName() ) == null ) {
+                ActorRef readerRef = getContext().actorOf( Props.create(
+                    QueueTimeouter.class, request.getQueueName()), request.getQueueName() + "_timeouter");
+                queueTimeoutersByQueueName.put( request.getQueueName(), readerRef );
+            }
+
+            // hand-off to queue's timeouter
+            queueTimeoutersByQueueName.get( request.getQueueName() ).tell( request, self() );
+
+
+        } else if ( message instanceof ShardCheckRequest ) {
+            ShardCheckRequest request = (ShardCheckRequest)message;
+
+            if ( shardAllocatorsByQueueName.get( request.getQueueName() ) == null ) {
+                ActorRef readerRef = getContext().actorOf( Props.create(
+                        ShardAllocator.class, request.getQueueName()), request.getQueueName() + "_shard_allocator");
+                shardAllocatorsByQueueName.put( request.getQueueName(), readerRef );
+            }
+
+            // hand-off to queue's shard allocator
+            shardAllocatorsByQueueName.get( request.getQueueName() ).tell( request, self() );
+
+
+        } else if ( message instanceof QueueGetRequest) {
+
+            Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.GET_TIME_GET ).time();
+            try {
+                QueueGetRequest queueGetRequest = (QueueGetRequest) message;
+
+                Collection<DatabaseQueueMessage> queueMessages = new ArrayList<>();
+
+                while (queueMessages.size() < queueGetRequest.getNumRequested()) {
+
+                    DatabaseQueueMessage queueMessage = inMemoryQueue.poll( queueGetRequest.getQueueName() );
+
+                    if (queueMessage != null) {
+                        if (queueActorHelper.putInflight( queueGetRequest.getQueueName(), queueMessage )) {
+                            queueMessages.add( queueMessage );
+                        }
+                    } else {
+                        logger.debug("in-memory queue for {} is empty, object is: {}",
+                                queueGetRequest.getQueueName(), inMemoryQueue );
+                        break;
+                    }
+                }
+
+                getSender().tell( new QueueGetResponse(
+                        DistributedQueueService.Status.SUCCESS, queueMessages ), getSender() );
+
+            } finally {
+                timer.close();
+            }
+
+
+        } else if ( message instanceof QueueAckRequest) {
+
+            Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.ACK_TIME_ACK ).time();
+            try {
+
+                QueueAckRequest queueAckRequest = (QueueAckRequest) message;
+
+                DistributedQueueService.Status status = queueActorHelper.ackQueueMessage(
+                        queueAckRequest.getQueueName(),
+                        queueAckRequest.getQueueMessageId() );
+
+                getSender().tell( new QueueAckResponse(
+                        queueAckRequest.getQueueName(),
+                        queueAckRequest.getQueueMessageId(),
+                        status ), getSender() );
+
+            } finally {
+                timer.close();
+            }
+
+        } else {
+            unhandled( message );
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
new file mode 100644
index 0000000..26db903
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.distributed.actors;
+
+import com.google.inject.Inject;
+import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
+import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
+import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLog;
+import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLogSerialization;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.UUID;
+
+
+public class QueueActorHelper {
+    private static final Logger logger = LoggerFactory.getLogger( QueueActorHelper.class );
+
+    private final ActorSystemFig            actorSystemFig;
+    private final QueueMessageSerialization messageSerialization;
+    private final AuditLogSerialization     auditLogSerialization;
+
+
+    @Inject
+    public QueueActorHelper(
+            ActorSystemFig actorSystemFig,
+            QueueMessageSerialization messageSerialization,
+            AuditLogSerialization auditLogSerialization
+            ) {
+
+        this.actorSystemFig = actorSystemFig;
+        this.messageSerialization = messageSerialization;
+        this.auditLogSerialization = auditLogSerialization;
+    }
+
+
+    DatabaseQueueMessage loadDatabaseQueueMessage(
+            String queueName, UUID queueMessageId, DatabaseQueueMessage.Type type ) {
+
+        try {
+            return messageSerialization.loadMessage(
+                    queueName,
+                    actorSystemFig.getRegionLocal(),
+                    null,
+                    type,
+                    queueMessageId );
+
+        } catch (Throwable t) {
+            logger.error( "Error reading queueMessage", t );
+        }
+
+        return null;
+    }
+
+
+    DistributedQueueService.Status ackQueueMessage(String queueName, UUID queueMessageId ) {
+
+        DatabaseQueueMessage queueMessage = loadDatabaseQueueMessage(
+                queueName, queueMessageId, DatabaseQueueMessage.Type.INFLIGHT );
+
+        if ( queueMessage == null ) {
+            return DistributedQueueService.Status.BAD_REQUEST;
+        }
+
+        boolean error = false;
+        try {
+            messageSerialization.deleteMessage(
+                    queueName,
+                    actorSystemFig.getRegionLocal(),
+                    null,
+                    DatabaseQueueMessage.Type.INFLIGHT,
+                    queueMessageId );
+
+        } catch (Throwable t) {
+            logger.error( "Error deleting queueMessage for ack", t );
+            error = true;
+        }
+
+        if ( !error ) {
+
+            auditLogSerialization.recordAuditLog(
+                    AuditLog.Action.ACK,
+                    AuditLog.Status.SUCCESS,
+                    queueName,
+                    actorSystemFig.getRegionLocal(),
+                    queueMessage.getMessageId(),
+                    queueMessageId );
+
+            return DistributedQueueService.Status.SUCCESS;
+
+        } else {
+
+            auditLogSerialization.recordAuditLog(
+                    AuditLog.Action.ACK,
+                    AuditLog.Status.ERROR,
+                    queueName,
+                    actorSystemFig.getRegionLocal(),
+                    queueMessage.getMessageId(),
+                    queueMessageId );
+
+            return DistributedQueueService.Status.ERROR;
+        }
+    }
+
+
+    boolean putInflight( String queueName, DatabaseQueueMessage queueMessage ) {
+
+        UUID qmid = queueMessage.getQueueMessageId();
+        try {
+            queueMessage.setType( DatabaseQueueMessage.Type.INFLIGHT );
+            queueMessage.setShardId( null );
+            queueMessage.setInflightAt( System.currentTimeMillis() );
+            messageSerialization.writeMessage( queueMessage );
+
+            messageSerialization.deleteMessage(
+                    queueName,
+                    actorSystemFig.getRegionLocal(),
+                    null,
+                    DatabaseQueueMessage.Type.DEFAULT,
+                    qmid);
+
+            //logger.debug("Put message {} inflight for queue name {}", qmid, queueName);
+
+        } catch ( Throwable t ) {
+            logger.error("Error putting inflight queue message " + qmid + " queue name: " + queueName, t);
+
+            auditLogSerialization.recordAuditLog(
+                    AuditLog.Action.GET,
+                    AuditLog.Status.ERROR,
+                    queueName,
+                    actorSystemFig.getRegionLocal(),
+                    queueMessage.getMessageId(),
+                    qmid);
+
+            return false;
+        }
+
+        auditLogSerialization.recordAuditLog(
+                AuditLog.Action.GET,
+                AuditLog.Status.SUCCESS,
+                queueName,
+                actorSystemFig.getRegionLocal(),
+                queueMessage.getMessageId(),
+                qmid);
+
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
new file mode 100644
index 0000000..97e591c
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.distributed.actors;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import akka.routing.ConsistentHashingRouter;
+import akka.routing.FromConfig;
+import org.apache.usergrid.persistence.qakka.distributed.messages.*;
+
+
+/**
+ * Use consistent hashing to route messages to QueueActors
+ */
+public class QueueActorRouter extends UntypedActor {
+
+    private final ActorRef routerRef;
+
+
+    public QueueActorRouter() {
+        routerRef = getContext().actorOf(
+                FromConfig.getInstance().props( Props.create(QueueActor.class)), "router");
+    }
+
+    @Override
+    public void onReceive(Object message) {
+
+        // TODO: can we do something smarter than this if-then-else structure
+        // e.g. if message is recognized as one of ours, then we just pass it on?
+
+        if ( message instanceof QueueGetRequest) {
+            QueueGetRequest qgr = (QueueGetRequest) message;
+
+            ConsistentHashingRouter.ConsistentHashableEnvelope envelope =
+                    new ConsistentHashingRouter.ConsistentHashableEnvelope( message, qgr.getQueueName() );
+            routerRef.tell( envelope, getSender() );
+
+        } else if ( message instanceof QueueAckRequest) {
+            QueueAckRequest qar = (QueueAckRequest)message;
+
+            ConsistentHashingRouter.ConsistentHashableEnvelope envelope =
+                    new ConsistentHashingRouter.ConsistentHashableEnvelope( message, qar.getQueueName() );
+            routerRef.tell( envelope, getSender());
+
+        } else if ( message instanceof QueueInitRequest) {
+            QueueInitRequest qar = (QueueInitRequest)message;
+
+            ConsistentHashingRouter.ConsistentHashableEnvelope envelope =
+                    new ConsistentHashingRouter.ConsistentHashableEnvelope( message, qar.getQueueName() );
+            routerRef.tell( envelope, getSender());
+
+        } else if ( message instanceof QueueRefreshRequest) {
+            QueueRefreshRequest qar = (QueueRefreshRequest)message;
+
+            ConsistentHashingRouter.ConsistentHashableEnvelope envelope =
+                    new ConsistentHashingRouter.ConsistentHashableEnvelope( message, qar.getQueueName() );
+            routerRef.tell( envelope, getSender());
+
+        } else if ( message instanceof QueueTimeoutRequest) {
+            QueueTimeoutRequest qar = (QueueTimeoutRequest)message;
+
+            ConsistentHashingRouter.ConsistentHashableEnvelope envelope =
+                    new ConsistentHashingRouter.ConsistentHashableEnvelope( message, qar.getQueueName() );
+            routerRef.tell( envelope, getSender());
+
+        } else if ( message instanceof ShardCheckRequest) {
+            ShardCheckRequest qar = (ShardCheckRequest)message;
+
+            ConsistentHashingRouter.ConsistentHashableEnvelope envelope =
+                    new ConsistentHashingRouter.ConsistentHashableEnvelope( message, qar.getQueueName() );
+            routerRef.tell( envelope, getSender());
+
+        } else {
+            unhandled(message);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
new file mode 100644
index 0000000..03ab1ec
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.distributed.actors;
+
+import akka.actor.UntypedActor;
+import com.codahale.metrics.Timer;
+import com.google.inject.Injector;
+import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
+import org.apache.usergrid.persistence.qakka.App;
+import org.apache.usergrid.persistence.qakka.MetricsService;
+import org.apache.usergrid.persistence.qakka.QakkaFig;
+import org.apache.usergrid.persistence.qakka.core.CassandraClient;
+import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
+import org.apache.usergrid.persistence.qakka.core.impl.InMemoryQueue;
+import org.apache.usergrid.persistence.qakka.distributed.messages.QueueRefreshRequest;
+import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
+import org.apache.usergrid.persistence.qakka.serialization.MultiShardMessageIterator;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+import java.util.UUID;
+
+
+public class QueueRefresher extends UntypedActor {
+    private static final Logger logger = LoggerFactory.getLogger( QueueRefresher.class );
+
+    private final String                    queueName;
+
+    private final QueueMessageSerialization serialization;
+    private final InMemoryQueue             inMemoryQueue;
+    private final QakkaFig qakkaFig;
+    private final ActorSystemFig            actorSystemFig;
+    private final MetricsService            metricsService;
+    private final CassandraClient cassandraClient;
+
+    public QueueRefresher(String queueName ) {
+        this.queueName = queueName;
+
+        Injector injector = App.INJECTOR;
+
+        serialization  = injector.getInstance( QueueMessageSerialization.class );
+        inMemoryQueue  = injector.getInstance( InMemoryQueue.class );
+        qakkaFig       = injector.getInstance( QakkaFig.class );
+        actorSystemFig = injector.getInstance( ActorSystemFig.class );
+        metricsService = injector.getInstance( MetricsService.class );
+        cassandraClient = injector.getInstance( CassandraClientImpl.class );
+    }
+
+
+    @Override
+    public void onReceive(Object message) {
+
+        if ( message instanceof QueueRefreshRequest ) {
+
+            QueueRefreshRequest request = (QueueRefreshRequest) message;
+
+            if (!request.getQueueName().equals( queueName )) {
+                throw new QakkaRuntimeException(
+                        "QueueWriter for " + queueName + ": Incorrect queueName " + request.getQueueName() );
+            }
+
+            Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.REFRESH_TIME).time();
+
+            try {
+
+                if (inMemoryQueue.size( queueName ) < qakkaFig.getQueueInMemorySize()) {
+
+                    ShardIterator shardIterator = new ShardIterator(
+                            cassandraClient, queueName, actorSystemFig.getRegionLocal(),
+                            Shard.Type.DEFAULT, Optional.empty() );
+
+                    UUID since = inMemoryQueue.getNewest( queueName );
+
+                    String region = actorSystemFig.getRegionLocal();
+                    MultiShardMessageIterator multiShardIterator = new MultiShardMessageIterator(
+                            cassandraClient, queueName, region, DatabaseQueueMessage.Type.DEFAULT,
+                            shardIterator, since);
+
+                    int need = qakkaFig.getQueueInMemorySize() - inMemoryQueue.size( queueName );
+                    int count = 0;
+
+                    while ( multiShardIterator.hasNext() && count < need ) {
+                        DatabaseQueueMessage queueMessage = multiShardIterator.next();
+                        inMemoryQueue.add( queueName, queueMessage );
+                        count++;
+                    }
+
+                    if ( count > 0 ) {
+                        logger.debug( "Added {} in-memory for queue {}, new size = {}",
+                                count, queueName, inMemoryQueue.size( queueName ) );
+                    }
+                }
+
+            } finally {
+                timer.close();
+            }
+
+        } else {
+            unhandled( message );
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java
new file mode 100644
index 0000000..8bd733b
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.distributed.actors;
+
+import akka.actor.ActorRef;
+import akka.actor.UntypedActor;
+import akka.cluster.client.ClusterClient;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import com.codahale.metrics.Timer;
+import com.google.inject.Injector;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
+import org.apache.usergrid.persistence.actorsystem.ActorSystemManager;
+import org.apache.usergrid.persistence.qakka.App;
+import org.apache.usergrid.persistence.qakka.MetricsService;
+import org.apache.usergrid.persistence.qakka.QakkaFig;
+import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
+import org.apache.usergrid.persistence.qakka.distributed.messages.QueueSendRequest;
+import org.apache.usergrid.persistence.qakka.distributed.messages.QueueSendResponse;
+import org.apache.usergrid.persistence.qakka.distributed.messages.QueueWriteRequest;
+import org.apache.usergrid.persistence.qakka.distributed.messages.QueueWriteResponse;
+import org.apache.usergrid.persistence.qakka.exceptions.QakkaException;
+import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
+import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLog;
+import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLogSerialization;
+import org.apache.usergrid.persistence.qakka.serialization.transferlog.TransferLogSerialization;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+
+public class QueueSender extends UntypedActor {
+    private static final Logger logger = LoggerFactory.getLogger( QueueSender.class );
+
+    private final String name = RandomStringUtils.randomAlphanumeric( 4 );
+
+    private final ActorSystemManager        actorSystemManager;
+    private final TransferLogSerialization  transferLogSerialization;
+    private final AuditLogSerialization     auditLogSerialization;
+    private final ActorSystemFig            actorSystemFig;
+    private final QakkaFig qakkaFig;
+    private final MetricsService            metricsService;
+
+    public QueueSender() {
+
+        Injector injector = App.INJECTOR;
+
+        actorSystemManager       = injector.getInstance( ActorSystemManager.class );
+        transferLogSerialization = injector.getInstance( TransferLogSerialization.class );
+        auditLogSerialization    = injector.getInstance( AuditLogSerialization.class );
+        actorSystemFig           = injector.getInstance( ActorSystemFig.class );
+        qakkaFig                 = injector.getInstance( QakkaFig.class );
+        metricsService           = injector.getInstance( MetricsService.class );
+    }
+
+    @Override
+    public void onReceive(Object message) {
+
+        if ( message instanceof QueueSendRequest) {
+            QueueSendRequest qa =  (QueueSendRequest) message;
+
+            // as far as caller is concerned, we are done.
+            getSender().tell( new QueueSendResponse(
+                    DistributedQueueService.Status.SUCCESS ), getSender() );
+
+            final QueueWriter.WriteStatus writeStatus = sendMessageToRegion(
+                    qa.getQueueName(),
+                    qa.getSourceRegion(),
+                    qa.getDestRegion(),
+                    qa.getMessageId(),
+                    qa.getDeliveryTime(),
+                    qa.getExpirationTime() );
+
+            logResponse( writeStatus, qa.getQueueName(), qa.getDestRegion(), qa.getMessageId() );
+
+        } else {
+            unhandled( message );
+        }
+    }
+
+
+    QueueWriter.WriteStatus sendMessageToRegion(
+
+            String queueName,
+            String sourceRegion,
+            String destRegion,
+            UUID messageId,
+            Long deliveryTime,
+            Long expirationTime ) {
+
+        Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.SEND_TIME_SEND ).time();
+        try {
+
+            int maxRetries = qakkaFig.getMaxSendRetries();
+            int retries = 0;
+
+            QueueWriteRequest request = new QueueWriteRequest(
+                    queueName, sourceRegion, destRegion, messageId, deliveryTime, expirationTime );
+
+            while (retries++ < maxRetries) {
+                try {
+                    Timeout t = new Timeout( qakkaFig.getSendTimeoutSeconds(), TimeUnit.SECONDS );
+
+                    Future<Object> fut;
+
+                    if (actorSystemManager.getCurrentRegion().equals( destRegion )) {
+
+                        // send to current region via local clientActor
+                        ActorRef clientActor = actorSystemManager.getClientActor();
+                        fut = Patterns.ask( clientActor, request, t );
+
+                    } else {
+
+                        // send to remote region via cluster client for that region
+                        ActorRef clusterClient = actorSystemManager.getClusterClient( destRegion );
+                        fut = Patterns.ask(
+                                clusterClient, new ClusterClient.Send( "/user/clientActor", request ), t );
+                    }
+
+                    // wait for response...
+                    final Object response = Await.result( fut, t.duration() );
+
+                    if (response != null && response instanceof QueueWriteResponse) {
+                        QueueWriteResponse qarm = (QueueWriteResponse) response;
+                        if (!QueueWriter.WriteStatus.ERROR.equals( qarm.getSendStatus() )) {
+
+                            if (retries > 1) {
+                                logger.debug( "queueAdd TOTAL_SUCCESS after {} retries", retries );
+                            }
+                            return qarm.getSendStatus();
+
+                        } else {
+                            logger.debug( "ERROR STATUS adding to queue, retrying {}", retries );
+                        }
+
+                    } else if (response != null) {
+                        logger.debug( "NULL RESPONSE adding to queue, retrying {}", retries );
+
+                    } else {
+                        logger.debug( "TIMEOUT adding to queue, retrying {}", retries );
+                    }
+
+                } catch (Exception e) {
+                    logger.debug( "ERROR adding to queue, retrying " + retries, e );
+                }
+            }
+
+            throw new QakkaRuntimeException( "Error adding to queue after " + retries );
+
+        } finally {
+            timer.stop();
+        }
+    }
+
+
+    void logResponse( QueueWriter.WriteStatus writeStatus, String queueName, String region, UUID messageId ) {
+
+        if ( writeStatus != null
+                && writeStatus.equals( QueueWriter.WriteStatus.ERROR ) ) {
+
+            logger.debug( "ERROR status sending message: {}, {}, {}, {}",
+                    new Object[]{queueName, actorSystemFig.getRegionLocal(), region, messageId} );
+
+            auditLogSerialization.recordAuditLog(
+                    AuditLog.Action.SEND,
+                    AuditLog.Status.ERROR,
+                    queueName,
+                    region,
+                    messageId,
+                    null);
+
+        } else if ( writeStatus != null
+                && writeStatus.equals( QueueWriter.WriteStatus.SUCCESS_XFERLOG_NOTDELETED ) ) {
+
+            //logger.debug( "Delivery Success, now removing transfer log: {}, {}, {}, {}",
+            //        new Object[]{queueName, actorSystemFig.getRegionLocal(), region, messageId} );
+
+            // queue actor failed to clean up transfer log
+            try {
+                transferLogSerialization.removeTransferLog(
+                        queueName, actorSystemFig.getRegionLocal(), region, messageId );
+
+            } catch (QakkaException se) {
+                logger.error( "Unable to delete remove transfer log for {}, {}, {}, {}",
+                        new Object[]{queueName, actorSystemFig.getRegionLocal(), region, messageId} );
+                logger.debug( "Unable to delete remove transfer log exception is:", se );
+            }
+
+        } else if ( writeStatus != null
+                && writeStatus.equals( QueueWriter.WriteStatus.SUCCESS_XFERLOG_DELETED ) ) {
+
+            //logger.debug( "Delivery Success: {}, {}, {}, {}",
+            //        new Object[]{queueName, actorSystemFig.getRegionLocal(), region, messageId} );
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSenderRouter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSenderRouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSenderRouter.java
new file mode 100644
index 0000000..20603a5
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSenderRouter.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.distributed.actors;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import akka.routing.FromConfig;
+import org.apache.usergrid.persistence.qakka.distributed.messages.QueueSendRequest;
+
+
+/**
+ * Route messages to QueueWriters
+ */
+public class QueueSenderRouter extends UntypedActor {
+
+    private final ActorRef router;
+
+
+    public QueueSenderRouter() {
+
+        router = getContext().actorOf(
+                FromConfig.getInstance().props(Props.create(QueueSender.class )), "router");
+    }
+
+    @Override
+    public void onReceive(Object message) {
+
+        if ( message instanceof QueueSendRequest) {
+            router.tell( message, getSender() );
+
+        } else {
+            unhandled(message);
+        }
+    }
+}