You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2014/10/06 23:04:45 UTC
git commit: comments
Repository: incubator-usergrid
Updated Branches:
refs/heads/sqs_queues 6f971cf6e -> 4cb7a9259
comments
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/4cb7a925
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/4cb7a925
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/4cb7a925
Branch: refs/heads/sqs_queues
Commit: 4cb7a9259c30d8905ec7f0feb6b81f3cf099c323
Parents: 6f971cf
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Oct 6 15:04:27 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Oct 6 15:04:27 2014 -0600
----------------------------------------------------------------------
.../persistence/queue/QueueManager.java | 40 ++++++++++++++++----
.../queue/impl/SQSQueueManagerImpl.java | 20 ++++------
.../persistence/queue/QueueManagerTest.java | 6 ---
3 files changed, 39 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4cb7a925/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
index 1f5a9e2..89abe40 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
@@ -21,20 +21,44 @@ import java.io.IOException;
import java.io.Serializable;
import java.util.List;
-
+/**
+ * Manages queues for usergrid. Current implementation is sqs based.
+ */
public interface QueueManager {
- Queue createQueue( );
-
- Queue getQueue();
-
- List<QueueMessage> getMessages(int limit,int transactionTimeout, int waitTime, Class klass) throws ClassNotFoundException, IOException;
+ /**
+ * Read messages from queue
+ * @param limit
+ * @param transactionTimeout timeout in ms
+ * @param waitTime wait time for next message in ms
+ * @param klass class to cast the return from
+ * @return List of Queue Messages
+ */
+ List<QueueMessage> getMessages(int limit,int transactionTimeout, int waitTime, Class klass);
+ /**
+ * Commit the transaction
+ * @param queueMessage
+ */
void commitMessage( QueueMessage queueMessage);
+ /**
+ * commit multiple messages
+ * @param queueMessages
+ */
void commitMessages( List<QueueMessage> queueMessages);
- void sendMessages(List bodies) throws IOException;
+ /**
+ * send messages to queue
+ * @param bodies body objects must be serializable
+ * @throws IOException
+ */
+ void sendMessages(List<Serializable> bodies) throws IOException;
- void sendMessage(Object body)throws IOException;
+ /**
+ * send a message to queue
+ * @param body
+ * @throws IOException
+ */
+ void sendMessage(Serializable body)throws IOException;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4cb7a925/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
index 4614089..85b7f1e 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
@@ -21,8 +21,6 @@ import com.amazonaws.AmazonClientException;
import com.amazonaws.SDKGlobalConfiguration;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
-import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
-import com.amazonaws.auth.SystemPropertiesCredentialsProvider;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.sqs.AmazonSQSClient;
@@ -36,13 +34,11 @@ import org.apache.commons.lang.StringUtils;
import org.apache.usergrid.persistence.queue.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.yaml.snakeyaml.util.Base64Coder;
-
import java.io.*;
import java.util.ArrayList;
import java.util.List;
-public class SQSQueueManagerImpl implements QueueManager {
+public class SQSQueueManagerImpl<T> implements QueueManager {
private static final Logger LOG = LoggerFactory.getLogger(SQSQueueManagerImpl.class);
private final AmazonSQSClient sqs;
@@ -72,7 +68,6 @@ public class SQSQueueManagerImpl implements QueueManager {
}
}
- @Override
public Queue createQueue(){
String name = getName();
CreateQueueRequest createQueueRequest = new CreateQueueRequest()
@@ -87,7 +82,6 @@ public class SQSQueueManagerImpl implements QueueManager {
String name = scope.getApplication().getType() + scope.getApplication().getUuid().toString() + scope.getName();
return name;
}
- @Override
public Queue getQueue(){
if(queue == null) {
ListQueuesResult result = sqs.listQueues();
@@ -106,7 +100,7 @@ public class SQSQueueManagerImpl implements QueueManager {
}
@Override
- public List<QueueMessage> getMessages(int limit, int transactionTimeout, int waitTime, Class klass) throws ClassNotFoundException, IOException {
+ public List<QueueMessage> getMessages(int limit, int transactionTimeout, int waitTime, Class klass) {
waitTime = waitTime/1000;
String url = getQueue().getUrl();
LOG.info("Getting {} messages from {}",limit,url);
@@ -124,7 +118,7 @@ public class SQSQueueManagerImpl implements QueueManager {
body = fromString(message.getBody(),klass);
}catch (Exception e){
LOG.error("failed to deserialize message", e);
- throw e;
+ throw new RuntimeException(e);
}
QueueMessage queueMessage = new QueueMessage(message.getMessageId(),message.getReceiptHandle(),body);
queueMessages.add(queueMessage);
@@ -143,15 +137,15 @@ public class SQSQueueManagerImpl implements QueueManager {
}
@Override
- public void sendMessages(List bodies) throws IOException {
+ public void sendMessages(List<Serializable> bodies) throws IOException {
String url = getQueue().getUrl();
LOG.info("Sending Messages...{} to {}",bodies.size(),url);
SendMessageBatchRequest request = new SendMessageBatchRequest(url);
List<SendMessageBatchRequestEntry> entries = new ArrayList<>(bodies.size());
- for(Object body : bodies){
+ for(Serializable body : bodies){
SendMessageBatchRequestEntry entry = new SendMessageBatchRequestEntry();
- entry.setMessageBody(toString((Serializable)body));
+ entry.setMessageBody(toString(body));
entries.add(entry);
}
request.setEntries(entries);
@@ -160,7 +154,7 @@ public class SQSQueueManagerImpl implements QueueManager {
}
@Override
- public void sendMessage(Object body) throws IOException {
+ public void sendMessage(Serializable body) throws IOException {
String url = getQueue().getUrl();
LOG.info("Sending Message...{} to {}",body.toString(),url);
SendMessageRequest request = new SendMessageRequest(url,toString((Serializable)body));
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4cb7a925/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
index 8e38f8c..b8b7967 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
@@ -58,12 +58,6 @@ public class QueueManagerTest {
@Ignore("need aws creds")
@Test
- public void get() {
- Queue queue = qm.getQueue();
- assertNotNull(queue);
- }
- @Ignore("need aws creds")
- @Test
public void send() throws IOException,ClassNotFoundException{
String value = "bodytest";
qm.sendMessage(value);