You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2016/09/16 22:34:41 UTC
[12/25] usergrid git commit: Initial integration of Qakka into
Usergrid Queue module,
and implementation of Qakka-based LegacyQueueManager implementation.
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QakkaUtils.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QakkaUtils.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QakkaUtils.java
new file mode 100644
index 0000000..9fe5c6e
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QakkaUtils.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.core;
+
+import com.datastax.driver.core.utils.UUIDs;
+
+import java.util.UUID;
+
+public class QakkaUtils {
+
+ public static UUID getTimeUuid() {
+ return UUIDs.timeBased();
+ }
+
+ public static Boolean isTimeUuid(UUID uuid) {
+ return uuid.version() == 1;
+ }
+
+ public static Boolean isNullOrEmpty(String s) {
+ return (s == null || s.equals(""));
+ }
+
+ public static UUID getTimeUUID(long when) {
+ return UUIDs.startOf( when );
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/Queue.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/Queue.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/Queue.java
new file mode 100644
index 0000000..e3d2790
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/Queue.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.core;
+
+import org.apache.usergrid.persistence.qakka.serialization.queues.DatabaseQueue;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+@XmlRootElement
+public class Queue {
+
+ // will eventually control these via properties file
+ private static final Integer defaultRetryCount = 3;
+ private static final Integer defaultHandlingTimeoutSec = 30;
+ private static final String defaultDeadLetterQueueExtension = "_DLQ";
+
+ private String name;
+ private String queueType;
+ private String regions;
+ private String defaultDestinations;
+ private Long defaultDelayMs;
+ private Integer retryCount;
+ private Integer handlingTimeoutSec;
+ private String deadLetterQueue;
+
+ public Queue() {} // Jackson needs no-arg ctor
+
+ public Queue(String name, String queueType, String regions, String defaultDestinations, Long defaultDelayMs,
+ Integer retryCount, Integer handlingTimeoutSec, String deadLetterQueue) {
+ this.name = name;
+ this.queueType = queueType;
+ this.regions = regions;
+ this.defaultDestinations = defaultDestinations;
+ this.defaultDelayMs = defaultDelayMs;
+ this.retryCount = retryCount;
+ this.handlingTimeoutSec = handlingTimeoutSec;
+ this.deadLetterQueue = deadLetterQueue;
+ }
+
+ public Queue(String name, String queueType, String regions, String defaultDestinations, Long defaultDelayMs) {
+ this(name, queueType, regions, defaultDestinations, defaultDelayMs, defaultRetryCount,
+ defaultHandlingTimeoutSec, name + defaultDeadLetterQueueExtension);
+ }
+
+ public Queue(String name, String queueType, String regions, String defaultDestinations) {
+ this(name, queueType, regions, defaultDestinations, 0L, defaultRetryCount,
+ defaultHandlingTimeoutSec, name + defaultDeadLetterQueueExtension);
+ }
+
+ public Queue(String name) {
+ this(name, QueueType.MULTIREGION, Regions.LOCAL, Regions.LOCAL, 0L, defaultRetryCount,
+ defaultHandlingTimeoutSec, name + defaultDeadLetterQueueExtension);
+ }
+
+ public Queue(DatabaseQueue databaseQueue) {
+ this( databaseQueue.getName(),
+ QueueType.MULTIREGION,
+ databaseQueue.getRegions(),
+ databaseQueue.getDefaultDestinations(),
+ databaseQueue.getDefaultDelayMs(),
+ databaseQueue.getRetryCount(),
+ databaseQueue.getHandlingTimeoutSec(),
+ databaseQueue.getDeadLetterQueue());
+ }
+
+ public String getName() {
+ return name;
+ }
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public String getQueueType() {
+ return queueType;
+ }
+
+ public String getRegions() {
+ return regions;
+ }
+ public void setRegions(String regions) {
+ this.regions = regions;
+ }
+
+ public String getDefaultDestinations() {
+ return defaultDestinations;
+ }
+ public void setDefaultDestinations(String defaultDestinations) {
+ this.defaultDestinations = defaultDestinations;
+ }
+
+ public Long getDefaultDelayMs() {
+ return defaultDelayMs;
+ }
+ public void setDefaultDelayMs(Long defaultDelayMs) {
+ this.defaultDelayMs = defaultDelayMs;
+ }
+
+ public Integer getRetryCount() {
+ return retryCount;
+ }
+ public void setRetryCount(Integer retryCount) {
+ this.retryCount = retryCount;
+ }
+
+ public Integer getHandlingTimeoutSec() {
+ return handlingTimeoutSec;
+ }
+ public void setHandlingTimeoutSec(Integer handlingTimeoutSec) {
+ this.handlingTimeoutSec = handlingTimeoutSec;
+ }
+
+ public String getDeadLetterQueue() {
+ return deadLetterQueue;
+ }
+ public void setDeadLetterQueue(String deadLetterQueue) {
+ this.deadLetterQueue = deadLetterQueue;
+ }
+
+ public DatabaseQueue toDatabaseQueue() {
+ return new DatabaseQueue(
+ name,
+ regions,
+ defaultDestinations,
+ defaultDelayMs,
+ retryCount,
+ handlingTimeoutSec,
+ deadLetterQueue);
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueManager.java
new file mode 100644
index 0000000..478fa12
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueManager.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.core;
+
+import java.util.List;
+
+public interface QueueManager {
+
+ void createQueue(Queue queue);
+
+ void updateQueueConfig(Queue queue);
+
+ void deleteQueue(String queueName);
+
+ Queue getQueueConfig(String queueName);
+
+ List<String> getListOfQueues();
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueMessage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueMessage.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueMessage.java
new file mode 100644
index 0000000..e79a241
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueMessage.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.core;
+
+import javax.xml.bind.annotation.XmlRootElement;
+import java.io.Serializable;
+import java.util.UUID;
+
+@XmlRootElement
+public class QueueMessage implements Serializable {
+
+ private UUID queueMessageId;
+ private UUID messageId;
+
+ private String queueName;
+ private String sendingRegion;
+ private String receivingRegion;
+
+ private Long delayUntilDate;
+ private Long expirationDate;
+ private Long createDate;
+ private Long retries;
+
+ private Boolean dataReceived;
+
+ /** MIME content type of data */
+ private String contentType;
+
+ /** If contentType is application/json then data will be the JSON payload for this queue message */
+ private String data;
+
+ /** If contentType is not then href will be the URL where the payload may be fetched */
+ private String href;
+
+
+ public QueueMessage() {} // for Jackson
+
+ public QueueMessage(
+ UUID queueMessageId, String queueName, String sendingRegion, String receivingRegion, UUID messageId,
+ Long delayUntilDate, Long expirationDate, Long createDate, Long retries, Boolean dataReceived) {
+
+ if (queueMessageId == null) {
+ this.queueMessageId = QakkaUtils.getTimeUuid();
+ } else {
+ this.queueMessageId = queueMessageId;
+ }
+ this.queueName = queueName;
+ this.sendingRegion = sendingRegion;
+ this.receivingRegion = receivingRegion;
+ this.messageId = messageId;
+ this.delayUntilDate = delayUntilDate;
+ this.expirationDate = expirationDate;
+ this.createDate = createDate;
+ this.retries = retries;
+ this.dataReceived = dataReceived;
+ }
+
+ public UUID getQueueMessageId() {
+ return queueMessageId;
+ }
+
+ public void setQueueMessageId(UUID queueMessageId) {
+ this.queueMessageId = queueMessageId;
+ }
+
+ public String getQueueName() {
+ return queueName;
+ }
+
+ public String getSendingRegion() {
+ return sendingRegion;
+ }
+
+ public String getReceivingRegion() {
+ return receivingRegion;
+ }
+
+ public UUID getMessageId() {
+ return messageId;
+ }
+
+ public Long getDelayUntilDate() {
+ return delayUntilDate;
+ }
+
+ public Long getDelayUntilMs() {
+ if ( delayUntilDate == null ) {
+ return null;
+ }
+ return delayUntilDate - System.currentTimeMillis();
+ }
+
+ public void setDelayUntilDate(Long delayUntilDate) {
+ this.delayUntilDate = delayUntilDate;
+ }
+
+ public void setDelayUntilMs(Long delayMs) {
+ this.delayUntilDate = System.currentTimeMillis() + delayMs;
+ }
+
+ public Long getExpirationDate() {
+ return expirationDate;
+ }
+
+ public Long getExpirationMs() {
+ if ( expirationDate == null ) {
+ return null;
+ }
+ return expirationDate - System.currentTimeMillis();
+ }
+
+ public void setExpirationDate(Long expirationDate) {
+ this.expirationDate = expirationDate;
+ }
+
+ public void setExpirationMs(Long expirationMs) {
+ this.expirationDate = System.currentTimeMillis() + expirationMs;
+ }
+
+ public Long getCreateDate() {
+ return createDate;
+ }
+
+ public void setCreateDate(Long createDate) {
+ this.createDate = createDate;
+ }
+
+ public Long getRetries() {
+ return retries;
+ }
+
+ public void setRetries(Long retries) {
+ this.retries = retries;
+ }
+
+ public Boolean getDataReceived() {
+ return dataReceived;
+ }
+
+ public void setDataReceived(Boolean dataReceived) {
+ this.dataReceived = dataReceived;
+ }
+
+
+ public String getData() {
+ return data;
+ }
+
+ public void setData(String data) {
+ this.data = data;
+ }
+
+ public String getContentType() {
+ return contentType;
+ }
+
+ public void setContentType(String contentType) {
+ this.contentType = contentType;
+ }
+
+ public String getHref() {
+ return href;
+ }
+
+ public void setHref(String href) {
+ this.href = href;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManager.java
new file mode 100644
index 0000000..15203d8
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManager.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.core;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.UUID;
+
+
+public interface QueueMessageManager {
+
+ /**
+ * Send Queue Message to one or more destination regions.
+ * @param queueName Name of queue
+ * @param destinationRegions List of destination regions
+ * @param delayMs Delay before sending queue message
+ * @param expirationSecs Time before message expires
+ * @param contentType Content type of message data
+ * @param messageData Message content
+ */
+ void sendMessages(String queueName, List<String> destinationRegions,
+ Long delayMs, Long expirationSecs, String contentType, ByteBuffer messageData);
+
+ /**
+ * Get next available messages from the specified queue.
+ *
+ * @param queueName Name of queue
+ * @param count Number of messages to get
+ * @return List of next messages, empty if non-available
+ */
+ List<QueueMessage> getNextMessages(String queueName, int count);
+
+ /**
+ * Acknowledge that message has been received and is no longer inflight.
+ *
+ * @param queueName Name of queue
+ * @param queueMessageId ID of queue message
+ */
+ void ackMessage(String queueName, UUID queueMessageId);
+
+ /**
+ * Put message back in the queue.
+ *
+ * @param queueName Name of the queue
+ * @param messageId ID of the queue message
+ * @param delayMs Delay before re-queueing message
+ */
+ void requeueMessage(String queueName, UUID messageId, Long delayMs);
+
+ /**
+ * Clear all messages from queue
+ *
+ * @param queueName Name of queue
+ */
+ void clearMessages(String queueName);
+
+ /**
+ * Get message payload data.
+ */
+ ByteBuffer getMessageData(UUID messageId);
+
+ /**
+ * Get message from messages available or messages inflight storage.
+ */
+ QueueMessage getMessage(String queueName, UUID queueMessageId);
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueType.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueType.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueType.java
new file mode 100644
index 0000000..f0f59e5
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueType.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.core;
+
+
+public class QueueType {
+ public static final String LOCAL = "_LOCAL";
+ public static final String MULTIREGION = "_MULTIREGION";
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/Regions.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/Regions.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/Regions.java
new file mode 100644
index 0000000..3097e92
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/Regions.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.core;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+
+@Singleton
+public class Regions {
+ public static final String LOCAL = "LOCAL";
+ public static final String ALL = "ALL";
+ public static final String REMOTE = "REMOTE";
+
+ // load regions from properties
+ String localRegion;
+ List<String> regionList;
+
+
+ @Inject
+ public Regions( ActorSystemFig actorSystemFig ) {
+ localRegion = actorSystemFig.getRegionLocal();
+ regionList = Arrays.asList( actorSystemFig.getRegionsList().split(","));
+ }
+
+
+ public List<String> getRegions(String region) {
+ List<String> ret = null;
+
+ switch (region) {
+ case ALL:
+ ret = new ArrayList<>(regionList);
+ break;
+ case LOCAL:
+ ret = Collections.singletonList(localRegion);
+ break;
+ case REMOTE:
+ ret = new ArrayList<>(regionList);
+ ret.remove(localRegion);
+ break;
+ default:
+ // parse regions into list -- assume a single region now, but can do region1,region2
+
+ // validate regions
+
+ ret = Collections.singletonList(region);
+ break;
+ }
+
+ return ret;
+ }
+
+ public String getLocalRegion() {
+ return localRegion;
+ }
+
+ public Boolean isValidRegion(String region) {
+ return regionList.contains(region);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
new file mode 100644
index 0000000..474ef5c
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.core.impl;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.usergrid.persistence.qakka.QakkaFig;
+import org.apache.usergrid.persistence.qakka.distributed.actors.QueueRefresher;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+
+@Singleton
+public class InMemoryQueue {
+ private static final Logger logger = LoggerFactory.getLogger( QueueRefresher.class );
+
+ private final Map<String, Queue<DatabaseQueueMessage>> queuesByName;
+ private final Map<String, UUID> newestByQueueName;
+
+
+ @Inject
+ InMemoryQueue(QakkaFig qakkaFig) {
+ queuesByName = new HashMap<>( qakkaFig.getQueueInMemorySize() );
+ newestByQueueName = new HashMap<>( qakkaFig.getQueueInMemorySize() );
+ }
+
+ private Queue<DatabaseQueueMessage> getQueue( String queueName ) {
+ synchronized ( queuesByName ) {
+ if ( !queuesByName.containsKey( queueName )) {
+ queuesByName.put( queueName, new ConcurrentLinkedQueue<>() );
+ }
+ return queuesByName.get( queueName );
+ }
+ }
+
+ public void add( String queueName, DatabaseQueueMessage databaseQueueMessage ) {
+ UUID newest = newestByQueueName.get( queueName );
+ if ( newest == null ) {
+ newest = databaseQueueMessage.getQueueMessageId();
+ } else {
+ if ( databaseQueueMessage.getQueueMessageId().compareTo( newest ) > 0 ) {
+ newest = databaseQueueMessage.getQueueMessageId();
+ }
+ }
+ newestByQueueName.put( queueName, newest );
+ getQueue( queueName ).add( databaseQueueMessage );
+ }
+
+ public UUID getNewest( String queueName ) {
+ return newestByQueueName.get( queueName );
+ }
+
+ public DatabaseQueueMessage poll( String queueName ) {
+ return getQueue( queueName ).poll();
+ }
+
+ public int size( String queueName ) {
+ return getQueue( queueName ).size();
+ }
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java
new file mode 100644
index 0000000..bbb46a8
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.core.impl;
+
+import com.google.inject.Inject;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
+import org.apache.usergrid.persistence.qakka.core.QakkaUtils;
+import org.apache.usergrid.persistence.qakka.core.Queue;
+import org.apache.usergrid.persistence.qakka.core.QueueManager;
+import org.apache.usergrid.persistence.qakka.core.Regions;
+import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
+import org.apache.usergrid.persistence.qakka.serialization.queues.DatabaseQueue;
+import org.apache.usergrid.persistence.qakka.serialization.queues.QueueSerialization;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardSerialization;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+public class QueueManagerImpl implements QueueManager {
+ private final ActorSystemFig actorSystemFig;
+ private final QueueSerialization queueSerialization;
+ private final DistributedQueueService distributedQueueService;
+ private final ShardSerialization shardSerialization;
+
+
+ @Inject
+ public QueueManagerImpl(
+ ActorSystemFig actorSystemFig,
+ QueueSerialization queueSerialization,
+ DistributedQueueService distributedQueueService,
+ ShardSerialization shardSerialization ) {
+
+ this.actorSystemFig = actorSystemFig;
+ this.queueSerialization = queueSerialization;
+ this.distributedQueueService = distributedQueueService;
+ this.shardSerialization = shardSerialization;
+ }
+
+ @Override
+ public void createQueue(Queue queue) {
+
+ queueSerialization.writeQueue(queue.toDatabaseQueue());
+
+ List<String> regions = new ArrayList<>();
+
+ if ( Regions.LOCAL.equals( queue.getRegions() ) || StringUtils.isEmpty( queue.getRegions() ) ) {
+ regions.add( actorSystemFig.getRegionLocal() );
+
+ } else if ( Regions.ALL.equals( queue.getRegions() )) {
+ for ( String region : actorSystemFig.getRegionsList().split(",")) {
+ regions.add( region );
+ }
+
+ } else {
+ for (String region : queue.getRegions().split( "," )) {
+ regions.add( region );
+ }
+ }
+
+ for ( String region : regions ) {
+
+ Shard available = new Shard( queue.getName(), region, Shard.Type.DEFAULT, 1L, QakkaUtils.getTimeUuid());
+ shardSerialization.createShard( available );
+
+ Shard inflight = new Shard( queue.getName(), region, Shard.Type.INFLIGHT, 1L, QakkaUtils.getTimeUuid());
+ shardSerialization.createShard( inflight );
+ }
+
+ distributedQueueService.initQueue( queue.getName() );
+ distributedQueueService.refreshQueue( queue.getName() );
+ }
+
+ @Override
+ public void updateQueueConfig(Queue queue) {
+
+ queueSerialization.writeQueue(queue.toDatabaseQueue());
+
+ distributedQueueService.initQueue( queue.getName() );
+ distributedQueueService.refreshQueue( queue.getName() );
+ }
+
+ @Override
+ public void deleteQueue(String queueName) {
+
+ queueSerialization.deleteQueue(queueName);
+
+ // TODO: implement delete queue for Akka, stop schedulers, etc.
+ //qas.deleteQueue(queueName);
+ }
+
+ @Override
+ public Queue getQueueConfig(String queueName) {
+
+ DatabaseQueue databaseQueue = queueSerialization.getQueue(queueName);
+ if ( databaseQueue != null ) {
+ return new Queue( databaseQueue );
+ }
+ return null;
+ }
+
+ @Override
+ public List<String> getListOfQueues() {
+ return queueSerialization.getListOfQueues();
+ }
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java
new file mode 100644
index 0000000..bcd0f58
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.core.impl;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
+import org.apache.usergrid.persistence.qakka.api.URIStrategy;
+import org.apache.usergrid.persistence.qakka.core.QakkaUtils;
+import org.apache.usergrid.persistence.qakka.core.QueueManager;
+import org.apache.usergrid.persistence.qakka.core.QueueMessage;
+import org.apache.usergrid.persistence.qakka.core.QueueMessageManager;
+import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
+import org.apache.usergrid.persistence.qakka.exceptions.BadRequestException;
+import org.apache.usergrid.persistence.qakka.exceptions.NotFoundException;
+import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessageBody;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
+import org.apache.usergrid.persistence.qakka.serialization.transferlog.TransferLogSerialization;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+
+
+@Singleton
+public class QueueMessageManagerImpl implements QueueMessageManager {
+
+ private static final Logger logger = LoggerFactory.getLogger( QueueMessageManagerImpl.class );
+
+ private final ActorSystemFig actorSystemFig;
+ private final QueueManager queueManager;
+ private final QueueMessageSerialization queueMessageSerialization;
+ private final DistributedQueueService distributedQueueService;
+ private final TransferLogSerialization transferLogSerialization;
+ private final URIStrategy uriStrategy;
+
+
+ @Inject
+ public QueueMessageManagerImpl(
+ ActorSystemFig actorSystemFig,
+ QueueManager queueManager,
+ QueueMessageSerialization queueMessageSerialization,
+ DistributedQueueService distributedQueueService,
+ TransferLogSerialization transferLogSerialization,
+ URIStrategy uriStrategy
+ ) {
+
+ this.actorSystemFig = actorSystemFig;
+ this.queueManager = queueManager;
+ this.queueMessageSerialization = queueMessageSerialization;
+ this.distributedQueueService = distributedQueueService;
+ this.transferLogSerialization = transferLogSerialization;
+ this.uriStrategy = uriStrategy;
+ }
+
+
+ @Override
+ public void sendMessages(String queueName, List<String> destinationRegions,
+ Long delayMs, Long expirationSecs, String contentType, ByteBuffer messageData) {
+
+ // TODO: implement delay and expiration
+
+// Preconditions.checkArgument(delayMs == null || delayMs > 0L,
+// "Delay milliseconds must be greater than zero");
+// Preconditions.checkArgument(expirationSecs == null || expirationSecs > 0L,
+// "Expiration seconds must be greater than zero");
+
+ if ( queueManager.getQueueConfig( queueName ) == null ) {
+ throw new NotFoundException( "Queue not found: " + queueName );
+ }
+
+ // get current time
+ Long currentTimeMs = System.currentTimeMillis();
+
+ // create message id
+ UUID messageId = QakkaUtils.getTimeUuid();
+
+ Long deliveryTime = delayMs != null ? currentTimeMs + delayMs : null;
+ Long expirationTime = expirationSecs != null ? currentTimeMs + (1000 * expirationSecs) : null;
+
+ // write message data to C*
+ queueMessageSerialization.writeMessageData(
+ messageId, new DatabaseQueueMessageBody(messageData, contentType));
+
+ for (String region : destinationRegions) {
+
+ transferLogSerialization.recordTransferLog(
+ queueName, actorSystemFig.getRegionLocal(), region, messageId );
+
+ // send message to destination region's queue
+ DistributedQueueService.Status status = null;
+ try {
+ status = distributedQueueService.sendMessageToRegion(
+ queueName,
+ actorSystemFig.getRegionLocal(),
+ region,
+ messageId,
+ deliveryTime,
+ expirationTime );
+
+ //logger.debug("Send message to queueName {} in region {}", queueName, region );
+
+ } catch ( QakkaRuntimeException qae ) {
+ logger.error("Error sending message " + messageId + " to " + region, qae);
+ }
+ }
+ }
+
+
+ @Override
+ public List<QueueMessage> getNextMessages(String queueName, int count) {
+
+ if ( queueManager.getQueueConfig( queueName ) == null ) {
+ throw new NotFoundException( "Queue not found: " + queueName );
+ }
+
+ Collection<DatabaseQueueMessage> dbMessages = distributedQueueService.getNextMessages( queueName, count );
+
+ List<QueueMessage> queueMessages = joinMessages( queueName, dbMessages );
+
+ if ( queueMessages.size() < count && queueMessages.size() < dbMessages.size() ) {
+ logger.debug("Messages failed to join for queue:{}, get more", queueName);
+
+ // some messages failed to join, get more
+ dbMessages = distributedQueueService.getNextMessages( queueName, count - queueMessages.size() );
+ queueMessages.addAll( joinMessages( queueName, dbMessages ) );
+ }
+
+ return queueMessages;
+ }
+
+
+ private List<QueueMessage> joinMessages( String queueName, Collection<DatabaseQueueMessage> dbMessages) {
+
+ List<QueueMessage> queueMessages = new ArrayList<>();
+
+ for (DatabaseQueueMessage dbMessage : dbMessages) {
+
+ DatabaseQueueMessageBody data = queueMessageSerialization.loadMessageData( dbMessage.getMessageId() );
+
+ if ( data != null ) {
+
+ QueueMessage queueMessage = new QueueMessage(
+ dbMessage.getQueueMessageId(),
+ queueName,
+ null, // sending region
+ dbMessage.getRegion(), // receiving region
+ dbMessage.getMessageId(),
+ null, // delay until date
+ null, // expiration date
+ dbMessage.getQueuedAt(),
+ null, // retries
+ true );
+
+ queueMessage.setContentType( data.getContentType() );
+ if ( "application/json".equals( data.getContentType() )) {
+ try {
+ String json = new String( data.getBlob().array(), "UTF-8");
+ queueMessage.setData( json );
+
+ } catch (UnsupportedEncodingException e) {
+ logger.error("Error unencoding data for messageId=" + queueMessage.getMessageId(), e);
+ }
+ } else {
+ try {
+ queueMessage.setHref( uriStrategy.queueMessageDataURI(
+ queueName, queueMessage.getQueueMessageId()).toString());
+
+ } catch (URISyntaxException e) {
+ throw new QakkaRuntimeException( "Error forming URI for message data", e );
+ }
+ }
+
+ queueMessages.add( queueMessage );
+ }
+ }
+
+ return queueMessages;
+ }
+
+
+ @Override
+ public void ackMessage(String queueName, UUID queueMessageId) {
+
+ if ( queueManager.getQueueConfig( queueName ) == null ) {
+ throw new NotFoundException( "Queue not found: " + queueName );
+ }
+
+ DistributedQueueService.Status status = distributedQueueService.ackMessage( queueName, queueMessageId );
+
+ if ( DistributedQueueService.Status.BAD_REQUEST.equals( status )) {
+ throw new BadRequestException( "Message not inflight" );
+
+ } else if ( DistributedQueueService.Status.ERROR.equals( status )) {
+ throw new QakkaRuntimeException( "Unable to ack message due to error" );
+ }
+ }
+
+
+ @Override
+ public void requeueMessage(String queueName, UUID messageId, Long delayMs) {
+
+ if ( queueManager.getQueueConfig( queueName ) == null ) {
+ throw new NotFoundException( "Queue not found: " + queueName );
+ }
+
+ // TODO: implement requeueMessage
+
+ throw new UnsupportedOperationException( "requeueMessage not yet implemented" );
+ }
+
+
+ @Override
+ public void clearMessages(String queueName) {
+
+ if ( queueManager.getQueueConfig( queueName ) == null ) {
+ throw new NotFoundException( "Queue not found: " + queueName );
+ }
+
+ // TODO: implement clearMessages
+
+ throw new UnsupportedOperationException( "clearMessages not yet implemented" );
+ }
+
+
+ @Override
+ public ByteBuffer getMessageData( UUID messageId ) {
+ DatabaseQueueMessageBody body = queueMessageSerialization.loadMessageData( messageId );
+ return body != null ? body.getBlob() : null;
+ }
+
+
+ /**
+ * Get but do not put inflight specified queue message, first looking in INFLIGHT table then DEFAULT.
+ */
+ @Override
+ public QueueMessage getMessage( String queueName, UUID queueMessageId ) {
+
+ QueueMessage queueMessage = null;
+
+ // first look in INFLIGHT storage
+
+
+ DatabaseQueueMessage dbMessage = queueMessageSerialization.loadMessage(
+ queueName, actorSystemFig.getRegionLocal(), null,
+ DatabaseQueueMessage.Type.INFLIGHT, queueMessageId );
+
+ if ( dbMessage == null ) {
+
+ // not found, so now look in DEFAULT storage
+
+ dbMessage = queueMessageSerialization.loadMessage(
+ queueName, actorSystemFig.getRegionLocal(), null,
+ DatabaseQueueMessage.Type.DEFAULT, queueMessageId );
+ }
+
+ if ( dbMessage != null ) {
+ queueMessage = new QueueMessage(
+ dbMessage.getQueueMessageId(),
+ queueName,
+ null, // sending region
+ dbMessage.getRegion(), // receiving region
+ dbMessage.getMessageId(),
+ null, // delay until date
+ null, // expiration date
+ dbMessage.getQueuedAt(),
+ null, // retries
+ true );
+ }
+
+ return queueMessage;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/DistributedQueueService.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/DistributedQueueService.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/DistributedQueueService.java
new file mode 100644
index 0000000..c2ca6b1
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/DistributedQueueService.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.distributed;
+
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+
+import java.util.Collection;
+import java.util.UUID;
+
+
+/**
+ * Interface to distributed part of Qakka queue implementation.
+ */
+public interface DistributedQueueService {
+
+ enum Status { SUCCESS, ERROR, BAD_REQUEST };
+
+ void init();
+
+ void initQueue(String queueName);
+
+ void refresh();
+
+ void refreshQueue(String queueName);
+
+ void processTimeouts();
+
+ Status sendMessageToRegion(
+ String queueName,
+ String sourceRegion,
+ String destRegion,
+ UUID messageId,
+ Long deliveryTime,
+ Long expirationTime);
+
+ Collection<DatabaseQueueMessage> getNextMessages(String queueName, int numMessages);
+
+ Status ackMessage(String queueName, UUID messageId);
+
+ Status requeueMessage(String queueName, UUID messageId);
+
+ Status clearMessages(String queueName);
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
new file mode 100644
index 0000000..6ecffba
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.distributed.actors;
+
+import akka.actor.ActorRef;
+import akka.actor.Cancellable;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import com.codahale.metrics.Timer;
+import com.google.inject.Injector;
+import org.apache.usergrid.persistence.qakka.App;
+import org.apache.usergrid.persistence.qakka.MetricsService;
+import org.apache.usergrid.persistence.qakka.QakkaFig;
+import org.apache.usergrid.persistence.qakka.core.impl.InMemoryQueue;
+import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
+import org.apache.usergrid.persistence.qakka.distributed.messages.*;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.Duration;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+
+public class QueueActor extends UntypedActor {
+ private static final Logger logger = LoggerFactory.getLogger( QueueActor.class );
+
+ private final QakkaFig qakkaFig;
+ private final InMemoryQueue inMemoryQueue;
+ private final QueueActorHelper queueActorHelper;
+ private final MetricsService metricsService;
+
+ private final Map<String, Cancellable> refreshSchedulersByQueueName = new HashMap<>();
+ private final Map<String, Cancellable> timeoutSchedulersByQueueName = new HashMap<>();
+ private final Map<String, Cancellable> shardAllocationSchedulersByQueueName = new HashMap<>();
+
+ private final Map<String, ActorRef> queueReadersByQueueName = new HashMap<>();
+ private final Map<String, ActorRef> queueTimeoutersByQueueName = new HashMap<>();
+ private final Map<String, ActorRef> shardAllocatorsByQueueName = new HashMap<>();
+
+
+ public QueueActor() {
+
+ Injector injector = App.INJECTOR;
+
+ qakkaFig = injector.getInstance( QakkaFig.class );
+ inMemoryQueue = injector.getInstance( InMemoryQueue.class );
+ queueActorHelper = injector.getInstance( QueueActorHelper.class );
+ metricsService = injector.getInstance( MetricsService.class );
+ }
+
+ @Override
+ public void onReceive(Object message) {
+
+ if ( message instanceof QueueInitRequest) {
+ QueueInitRequest request = (QueueInitRequest)message;
+
+ if ( refreshSchedulersByQueueName.get( request.getQueueName() ) == null ) {
+ Cancellable scheduler = getContext().system().scheduler().schedule(
+ Duration.create( 0, TimeUnit.MILLISECONDS),
+ Duration.create( qakkaFig.getQueueRefreshMilliseconds(), TimeUnit.MILLISECONDS),
+ self(),
+ new QueueRefreshRequest( request.getQueueName() ),
+ getContext().dispatcher(),
+ getSelf());
+ refreshSchedulersByQueueName.put( request.getQueueName(), scheduler );
+ }
+
+ if ( timeoutSchedulersByQueueName.get( request.getQueueName() ) == null ) {
+ Cancellable scheduler = getContext().system().scheduler().schedule(
+ Duration.create( 0, TimeUnit.MILLISECONDS),
+ Duration.create( qakkaFig.getQueueTimeoutSeconds()/2, TimeUnit.SECONDS),
+ self(),
+ new QueueTimeoutRequest( request.getQueueName() ),
+ getContext().dispatcher(),
+ getSelf());
+ timeoutSchedulersByQueueName.put( request.getQueueName(), scheduler );
+ }
+
+ if ( shardAllocationSchedulersByQueueName.get( request.getQueueName() ) == null ) {
+ Cancellable scheduler = getContext().system().scheduler().schedule(
+ Duration.create( 0, TimeUnit.MILLISECONDS),
+ Duration.create( qakkaFig.getShardAllocationCheckFrequencyMillis(), TimeUnit.MILLISECONDS),
+ self(),
+ new ShardCheckRequest( request.getQueueName() ),
+ getContext().dispatcher(),
+ getSelf());
+ shardAllocationSchedulersByQueueName.put( request.getQueueName(), scheduler );
+ }
+
+ } else if ( message instanceof QueueRefreshRequest ) {
+ QueueRefreshRequest request = (QueueRefreshRequest)message;
+
+ if ( queueReadersByQueueName.get( request.getQueueName() ) == null ) {
+ ActorRef readerRef = getContext().actorOf( Props.create(
+ QueueRefresher.class, request.getQueueName()), request.getQueueName() + "_reader");
+ queueReadersByQueueName.put( request.getQueueName(), readerRef );
+ }
+
+ // hand-off to queue's reader
+ queueReadersByQueueName.get( request.getQueueName() ).tell( request, self() );
+
+ } else if ( message instanceof QueueTimeoutRequest ) {
+ QueueTimeoutRequest request = (QueueTimeoutRequest)message;
+
+ if ( queueTimeoutersByQueueName.get( request.getQueueName() ) == null ) {
+ ActorRef readerRef = getContext().actorOf( Props.create(
+ QueueTimeouter.class, request.getQueueName()), request.getQueueName() + "_timeouter");
+ queueTimeoutersByQueueName.put( request.getQueueName(), readerRef );
+ }
+
+ // hand-off to queue's timeouter
+ queueTimeoutersByQueueName.get( request.getQueueName() ).tell( request, self() );
+
+
+ } else if ( message instanceof ShardCheckRequest ) {
+ ShardCheckRequest request = (ShardCheckRequest)message;
+
+ if ( shardAllocatorsByQueueName.get( request.getQueueName() ) == null ) {
+ ActorRef readerRef = getContext().actorOf( Props.create(
+ ShardAllocator.class, request.getQueueName()), request.getQueueName() + "_shard_allocator");
+ shardAllocatorsByQueueName.put( request.getQueueName(), readerRef );
+ }
+
+ // hand-off to queue's shard allocator
+ shardAllocatorsByQueueName.get( request.getQueueName() ).tell( request, self() );
+
+
+ } else if ( message instanceof QueueGetRequest) {
+
+ Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.GET_TIME_GET ).time();
+ try {
+ QueueGetRequest queueGetRequest = (QueueGetRequest) message;
+
+ Collection<DatabaseQueueMessage> queueMessages = new ArrayList<>();
+
+ while (queueMessages.size() < queueGetRequest.getNumRequested()) {
+
+ DatabaseQueueMessage queueMessage = inMemoryQueue.poll( queueGetRequest.getQueueName() );
+
+ if (queueMessage != null) {
+ if (queueActorHelper.putInflight( queueGetRequest.getQueueName(), queueMessage )) {
+ queueMessages.add( queueMessage );
+ }
+ } else {
+ logger.debug("in-memory queue for {} is empty, object is: {}",
+ queueGetRequest.getQueueName(), inMemoryQueue );
+ break;
+ }
+ }
+
+ getSender().tell( new QueueGetResponse(
+ DistributedQueueService.Status.SUCCESS, queueMessages ), getSender() );
+
+ } finally {
+ timer.close();
+ }
+
+
+ } else if ( message instanceof QueueAckRequest) {
+
+ Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.ACK_TIME_ACK ).time();
+ try {
+
+ QueueAckRequest queueAckRequest = (QueueAckRequest) message;
+
+ DistributedQueueService.Status status = queueActorHelper.ackQueueMessage(
+ queueAckRequest.getQueueName(),
+ queueAckRequest.getQueueMessageId() );
+
+ getSender().tell( new QueueAckResponse(
+ queueAckRequest.getQueueName(),
+ queueAckRequest.getQueueMessageId(),
+ status ), getSender() );
+
+ } finally {
+ timer.close();
+ }
+
+ } else {
+ unhandled( message );
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
new file mode 100644
index 0000000..26db903
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.distributed.actors;
+
+import com.google.inject.Inject;
+import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
+import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
+import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLog;
+import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLogSerialization;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.UUID;
+
+
+public class QueueActorHelper {
+ private static final Logger logger = LoggerFactory.getLogger( QueueActorHelper.class );
+
+ private final ActorSystemFig actorSystemFig;
+ private final QueueMessageSerialization messageSerialization;
+ private final AuditLogSerialization auditLogSerialization;
+
+
+ @Inject
+ public QueueActorHelper(
+ ActorSystemFig actorSystemFig,
+ QueueMessageSerialization messageSerialization,
+ AuditLogSerialization auditLogSerialization
+ ) {
+
+ this.actorSystemFig = actorSystemFig;
+ this.messageSerialization = messageSerialization;
+ this.auditLogSerialization = auditLogSerialization;
+ }
+
+
+ DatabaseQueueMessage loadDatabaseQueueMessage(
+ String queueName, UUID queueMessageId, DatabaseQueueMessage.Type type ) {
+
+ try {
+ return messageSerialization.loadMessage(
+ queueName,
+ actorSystemFig.getRegionLocal(),
+ null,
+ type,
+ queueMessageId );
+
+ } catch (Throwable t) {
+ logger.error( "Error reading queueMessage", t );
+ }
+
+ return null;
+ }
+
+
+ DistributedQueueService.Status ackQueueMessage(String queueName, UUID queueMessageId ) {
+
+ DatabaseQueueMessage queueMessage = loadDatabaseQueueMessage(
+ queueName, queueMessageId, DatabaseQueueMessage.Type.INFLIGHT );
+
+ if ( queueMessage == null ) {
+ return DistributedQueueService.Status.BAD_REQUEST;
+ }
+
+ boolean error = false;
+ try {
+ messageSerialization.deleteMessage(
+ queueName,
+ actorSystemFig.getRegionLocal(),
+ null,
+ DatabaseQueueMessage.Type.INFLIGHT,
+ queueMessageId );
+
+ } catch (Throwable t) {
+ logger.error( "Error deleting queueMessage for ack", t );
+ error = true;
+ }
+
+ if ( !error ) {
+
+ auditLogSerialization.recordAuditLog(
+ AuditLog.Action.ACK,
+ AuditLog.Status.SUCCESS,
+ queueName,
+ actorSystemFig.getRegionLocal(),
+ queueMessage.getMessageId(),
+ queueMessageId );
+
+ return DistributedQueueService.Status.SUCCESS;
+
+ } else {
+
+ auditLogSerialization.recordAuditLog(
+ AuditLog.Action.ACK,
+ AuditLog.Status.ERROR,
+ queueName,
+ actorSystemFig.getRegionLocal(),
+ queueMessage.getMessageId(),
+ queueMessageId );
+
+ return DistributedQueueService.Status.ERROR;
+ }
+ }
+
+
+ boolean putInflight( String queueName, DatabaseQueueMessage queueMessage ) {
+
+ UUID qmid = queueMessage.getQueueMessageId();
+ try {
+ queueMessage.setType( DatabaseQueueMessage.Type.INFLIGHT );
+ queueMessage.setShardId( null );
+ queueMessage.setInflightAt( System.currentTimeMillis() );
+ messageSerialization.writeMessage( queueMessage );
+
+ messageSerialization.deleteMessage(
+ queueName,
+ actorSystemFig.getRegionLocal(),
+ null,
+ DatabaseQueueMessage.Type.DEFAULT,
+ qmid);
+
+ //logger.debug("Put message {} inflight for queue name {}", qmid, queueName);
+
+ } catch ( Throwable t ) {
+ logger.error("Error putting inflight queue message " + qmid + " queue name: " + queueName, t);
+
+ auditLogSerialization.recordAuditLog(
+ AuditLog.Action.GET,
+ AuditLog.Status.ERROR,
+ queueName,
+ actorSystemFig.getRegionLocal(),
+ queueMessage.getMessageId(),
+ qmid);
+
+ return false;
+ }
+
+ auditLogSerialization.recordAuditLog(
+ AuditLog.Action.GET,
+ AuditLog.Status.SUCCESS,
+ queueName,
+ actorSystemFig.getRegionLocal(),
+ queueMessage.getMessageId(),
+ qmid);
+
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
new file mode 100644
index 0000000..97e591c
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.distributed.actors;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import akka.routing.ConsistentHashingRouter;
+import akka.routing.FromConfig;
+import org.apache.usergrid.persistence.qakka.distributed.messages.*;
+
+
+/**
+ * Use consistent hashing to route messages to QueueActors
+ */
+public class QueueActorRouter extends UntypedActor {
+
+ private final ActorRef routerRef;
+
+
+ public QueueActorRouter() {
+ routerRef = getContext().actorOf(
+ FromConfig.getInstance().props( Props.create(QueueActor.class)), "router");
+ }
+
+ @Override
+ public void onReceive(Object message) {
+
+ // TODO: can we do something smarter than this if-then-else structure
+ // e.g. if message is recognized as one of ours, then we just pass it on?
+
+ if ( message instanceof QueueGetRequest) {
+ QueueGetRequest qgr = (QueueGetRequest) message;
+
+ ConsistentHashingRouter.ConsistentHashableEnvelope envelope =
+ new ConsistentHashingRouter.ConsistentHashableEnvelope( message, qgr.getQueueName() );
+ routerRef.tell( envelope, getSender() );
+
+ } else if ( message instanceof QueueAckRequest) {
+ QueueAckRequest qar = (QueueAckRequest)message;
+
+ ConsistentHashingRouter.ConsistentHashableEnvelope envelope =
+ new ConsistentHashingRouter.ConsistentHashableEnvelope( message, qar.getQueueName() );
+ routerRef.tell( envelope, getSender());
+
+ } else if ( message instanceof QueueInitRequest) {
+ QueueInitRequest qar = (QueueInitRequest)message;
+
+ ConsistentHashingRouter.ConsistentHashableEnvelope envelope =
+ new ConsistentHashingRouter.ConsistentHashableEnvelope( message, qar.getQueueName() );
+ routerRef.tell( envelope, getSender());
+
+ } else if ( message instanceof QueueRefreshRequest) {
+ QueueRefreshRequest qar = (QueueRefreshRequest)message;
+
+ ConsistentHashingRouter.ConsistentHashableEnvelope envelope =
+ new ConsistentHashingRouter.ConsistentHashableEnvelope( message, qar.getQueueName() );
+ routerRef.tell( envelope, getSender());
+
+ } else if ( message instanceof QueueTimeoutRequest) {
+ QueueTimeoutRequest qar = (QueueTimeoutRequest)message;
+
+ ConsistentHashingRouter.ConsistentHashableEnvelope envelope =
+ new ConsistentHashingRouter.ConsistentHashableEnvelope( message, qar.getQueueName() );
+ routerRef.tell( envelope, getSender());
+
+ } else if ( message instanceof ShardCheckRequest) {
+ ShardCheckRequest qar = (ShardCheckRequest)message;
+
+ ConsistentHashingRouter.ConsistentHashableEnvelope envelope =
+ new ConsistentHashingRouter.ConsistentHashableEnvelope( message, qar.getQueueName() );
+ routerRef.tell( envelope, getSender());
+
+ } else {
+ unhandled(message);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
new file mode 100644
index 0000000..03ab1ec
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.distributed.actors;
+
+import akka.actor.UntypedActor;
+import com.codahale.metrics.Timer;
+import com.google.inject.Injector;
+import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
+import org.apache.usergrid.persistence.qakka.App;
+import org.apache.usergrid.persistence.qakka.MetricsService;
+import org.apache.usergrid.persistence.qakka.QakkaFig;
+import org.apache.usergrid.persistence.qakka.core.CassandraClient;
+import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
+import org.apache.usergrid.persistence.qakka.core.impl.InMemoryQueue;
+import org.apache.usergrid.persistence.qakka.distributed.messages.QueueRefreshRequest;
+import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
+import org.apache.usergrid.persistence.qakka.serialization.MultiShardMessageIterator;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+import java.util.UUID;
+
+
+public class QueueRefresher extends UntypedActor {
+ private static final Logger logger = LoggerFactory.getLogger( QueueRefresher.class );
+
+ private final String queueName;
+
+ private final QueueMessageSerialization serialization;
+ private final InMemoryQueue inMemoryQueue;
+ private final QakkaFig qakkaFig;
+ private final ActorSystemFig actorSystemFig;
+ private final MetricsService metricsService;
+ private final CassandraClient cassandraClient;
+
+ public QueueRefresher(String queueName ) {
+ this.queueName = queueName;
+
+ Injector injector = App.INJECTOR;
+
+ serialization = injector.getInstance( QueueMessageSerialization.class );
+ inMemoryQueue = injector.getInstance( InMemoryQueue.class );
+ qakkaFig = injector.getInstance( QakkaFig.class );
+ actorSystemFig = injector.getInstance( ActorSystemFig.class );
+ metricsService = injector.getInstance( MetricsService.class );
+ cassandraClient = injector.getInstance( CassandraClientImpl.class );
+ }
+
+
+ @Override
+ public void onReceive(Object message) {
+
+ if ( message instanceof QueueRefreshRequest ) {
+
+ QueueRefreshRequest request = (QueueRefreshRequest) message;
+
+ if (!request.getQueueName().equals( queueName )) {
+ throw new QakkaRuntimeException(
+ "QueueWriter for " + queueName + ": Incorrect queueName " + request.getQueueName() );
+ }
+
+ Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.REFRESH_TIME).time();
+
+ try {
+
+ if (inMemoryQueue.size( queueName ) < qakkaFig.getQueueInMemorySize()) {
+
+ ShardIterator shardIterator = new ShardIterator(
+ cassandraClient, queueName, actorSystemFig.getRegionLocal(),
+ Shard.Type.DEFAULT, Optional.empty() );
+
+ UUID since = inMemoryQueue.getNewest( queueName );
+
+ String region = actorSystemFig.getRegionLocal();
+ MultiShardMessageIterator multiShardIterator = new MultiShardMessageIterator(
+ cassandraClient, queueName, region, DatabaseQueueMessage.Type.DEFAULT,
+ shardIterator, since);
+
+ int need = qakkaFig.getQueueInMemorySize() - inMemoryQueue.size( queueName );
+ int count = 0;
+
+ while ( multiShardIterator.hasNext() && count < need ) {
+ DatabaseQueueMessage queueMessage = multiShardIterator.next();
+ inMemoryQueue.add( queueName, queueMessage );
+ count++;
+ }
+
+ if ( count > 0 ) {
+ logger.debug( "Added {} in-memory for queue {}, new size = {}",
+ count, queueName, inMemoryQueue.size( queueName ) );
+ }
+ }
+
+ } finally {
+ timer.close();
+ }
+
+ } else {
+ unhandled( message );
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java
new file mode 100644
index 0000000..8bd733b
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.distributed.actors;
+
+import akka.actor.ActorRef;
+import akka.actor.UntypedActor;
+import akka.cluster.client.ClusterClient;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import com.codahale.metrics.Timer;
+import com.google.inject.Injector;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
+import org.apache.usergrid.persistence.actorsystem.ActorSystemManager;
+import org.apache.usergrid.persistence.qakka.App;
+import org.apache.usergrid.persistence.qakka.MetricsService;
+import org.apache.usergrid.persistence.qakka.QakkaFig;
+import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
+import org.apache.usergrid.persistence.qakka.distributed.messages.QueueSendRequest;
+import org.apache.usergrid.persistence.qakka.distributed.messages.QueueSendResponse;
+import org.apache.usergrid.persistence.qakka.distributed.messages.QueueWriteRequest;
+import org.apache.usergrid.persistence.qakka.distributed.messages.QueueWriteResponse;
+import org.apache.usergrid.persistence.qakka.exceptions.QakkaException;
+import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
+import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLog;
+import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLogSerialization;
+import org.apache.usergrid.persistence.qakka.serialization.transferlog.TransferLogSerialization;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+
+public class QueueSender extends UntypedActor {
+ private static final Logger logger = LoggerFactory.getLogger( QueueSender.class );
+
+ private final String name = RandomStringUtils.randomAlphanumeric( 4 );
+
+ private final ActorSystemManager actorSystemManager;
+ private final TransferLogSerialization transferLogSerialization;
+ private final AuditLogSerialization auditLogSerialization;
+ private final ActorSystemFig actorSystemFig;
+ private final QakkaFig qakkaFig;
+ private final MetricsService metricsService;
+
+ public QueueSender() {
+
+ Injector injector = App.INJECTOR;
+
+ actorSystemManager = injector.getInstance( ActorSystemManager.class );
+ transferLogSerialization = injector.getInstance( TransferLogSerialization.class );
+ auditLogSerialization = injector.getInstance( AuditLogSerialization.class );
+ actorSystemFig = injector.getInstance( ActorSystemFig.class );
+ qakkaFig = injector.getInstance( QakkaFig.class );
+ metricsService = injector.getInstance( MetricsService.class );
+ }
+
+ @Override
+ public void onReceive(Object message) {
+
+ if ( message instanceof QueueSendRequest) {
+ QueueSendRequest qa = (QueueSendRequest) message;
+
+ // as far as caller is concerned, we are done.
+ getSender().tell( new QueueSendResponse(
+ DistributedQueueService.Status.SUCCESS ), getSender() );
+
+ final QueueWriter.WriteStatus writeStatus = sendMessageToRegion(
+ qa.getQueueName(),
+ qa.getSourceRegion(),
+ qa.getDestRegion(),
+ qa.getMessageId(),
+ qa.getDeliveryTime(),
+ qa.getExpirationTime() );
+
+ logResponse( writeStatus, qa.getQueueName(), qa.getDestRegion(), qa.getMessageId() );
+
+ } else {
+ unhandled( message );
+ }
+ }
+
+
+ QueueWriter.WriteStatus sendMessageToRegion(
+
+ String queueName,
+ String sourceRegion,
+ String destRegion,
+ UUID messageId,
+ Long deliveryTime,
+ Long expirationTime ) {
+
+ Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.SEND_TIME_SEND ).time();
+ try {
+
+ int maxRetries = qakkaFig.getMaxSendRetries();
+ int retries = 0;
+
+ QueueWriteRequest request = new QueueWriteRequest(
+ queueName, sourceRegion, destRegion, messageId, deliveryTime, expirationTime );
+
+ while (retries++ < maxRetries) {
+ try {
+ Timeout t = new Timeout( qakkaFig.getSendTimeoutSeconds(), TimeUnit.SECONDS );
+
+ Future<Object> fut;
+
+ if (actorSystemManager.getCurrentRegion().equals( destRegion )) {
+
+ // send to current region via local clientActor
+ ActorRef clientActor = actorSystemManager.getClientActor();
+ fut = Patterns.ask( clientActor, request, t );
+
+ } else {
+
+ // send to remote region via cluster client for that region
+ ActorRef clusterClient = actorSystemManager.getClusterClient( destRegion );
+ fut = Patterns.ask(
+ clusterClient, new ClusterClient.Send( "/user/clientActor", request ), t );
+ }
+
+ // wait for response...
+ final Object response = Await.result( fut, t.duration() );
+
+ if (response != null && response instanceof QueueWriteResponse) {
+ QueueWriteResponse qarm = (QueueWriteResponse) response;
+ if (!QueueWriter.WriteStatus.ERROR.equals( qarm.getSendStatus() )) {
+
+ if (retries > 1) {
+ logger.debug( "queueAdd TOTAL_SUCCESS after {} retries", retries );
+ }
+ return qarm.getSendStatus();
+
+ } else {
+ logger.debug( "ERROR STATUS adding to queue, retrying {}", retries );
+ }
+
+ } else if (response != null) {
+ logger.debug( "NULL RESPONSE adding to queue, retrying {}", retries );
+
+ } else {
+ logger.debug( "TIMEOUT adding to queue, retrying {}", retries );
+ }
+
+ } catch (Exception e) {
+ logger.debug( "ERROR adding to queue, retrying " + retries, e );
+ }
+ }
+
+ throw new QakkaRuntimeException( "Error adding to queue after " + retries );
+
+ } finally {
+ timer.stop();
+ }
+ }
+
+
+ void logResponse( QueueWriter.WriteStatus writeStatus, String queueName, String region, UUID messageId ) {
+
+ if ( writeStatus != null
+ && writeStatus.equals( QueueWriter.WriteStatus.ERROR ) ) {
+
+ logger.debug( "ERROR status sending message: {}, {}, {}, {}",
+ new Object[]{queueName, actorSystemFig.getRegionLocal(), region, messageId} );
+
+ auditLogSerialization.recordAuditLog(
+ AuditLog.Action.SEND,
+ AuditLog.Status.ERROR,
+ queueName,
+ region,
+ messageId,
+ null);
+
+ } else if ( writeStatus != null
+ && writeStatus.equals( QueueWriter.WriteStatus.SUCCESS_XFERLOG_NOTDELETED ) ) {
+
+ //logger.debug( "Delivery Success, now removing transfer log: {}, {}, {}, {}",
+ // new Object[]{queueName, actorSystemFig.getRegionLocal(), region, messageId} );
+
+ // queue actor failed to clean up transfer log
+ try {
+ transferLogSerialization.removeTransferLog(
+ queueName, actorSystemFig.getRegionLocal(), region, messageId );
+
+ } catch (QakkaException se) {
+ logger.error( "Unable to delete remove transfer log for {}, {}, {}, {}",
+ new Object[]{queueName, actorSystemFig.getRegionLocal(), region, messageId} );
+ logger.debug( "Unable to delete remove transfer log exception is:", se );
+ }
+
+ } else if ( writeStatus != null
+ && writeStatus.equals( QueueWriter.WriteStatus.SUCCESS_XFERLOG_DELETED ) ) {
+
+ //logger.debug( "Delivery Success: {}, {}, {}, {}",
+ // new Object[]{queueName, actorSystemFig.getRegionLocal(), region, messageId} );
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSenderRouter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSenderRouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSenderRouter.java
new file mode 100644
index 0000000..20603a5
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSenderRouter.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.distributed.actors;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import akka.routing.FromConfig;
+import org.apache.usergrid.persistence.qakka.distributed.messages.QueueSendRequest;
+
+
+/**
+ * Route messages to QueueWriters
+ */
+public class QueueSenderRouter extends UntypedActor {
+
+ private final ActorRef router;
+
+
+ public QueueSenderRouter() {
+
+ router = getContext().actorOf(
+ FromConfig.getInstance().props(Props.create(QueueSender.class )), "router");
+ }
+
+ @Override
+ public void onReceive(Object message) {
+
+ if ( message instanceof QueueSendRequest) {
+ router.tell( message, getSender() );
+
+ } else {
+ unhandled(message);
+ }
+ }
+}