You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2014/10/03 22:14:16 UTC
git commit: adding serialization
Repository: incubator-usergrid
Updated Branches:
refs/heads/sqs_queues bfcc33e13 -> a0a8ccd9b
adding serialization
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/a0a8ccd9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/a0a8ccd9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/a0a8ccd9
Branch: refs/heads/sqs_queues
Commit: a0a8ccd9bae2ef2a21cdac50a5f96cd293c771ef
Parents: bfcc33e
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Oct 3 14:14:02 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Oct 3 14:14:02 2014 -0600
----------------------------------------------------------------------
.../persistence/queue/QueueManager.java | 10 ++--
.../persistence/queue/QueueMessage.java | 7 ++-
.../queue/impl/SQSQueueManagerImpl.java | 48 +++++++++++++++++---
.../persistence/queue/QueueManagerTest.java | 20 ++++----
4 files changed, 58 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a0a8ccd9/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
index 992b533..48bc945 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
@@ -17,17 +17,17 @@
*/
package org.apache.usergrid.persistence.queue;
+import java.io.IOException;
+import java.io.Serializable;
import java.util.List;
-/**
- * Created by ApigeeCorporation on 10/3/14.
- */
+
public interface QueueManager {
Queue createQueue( );
Queue getQueue();
List<QueueMessage> getMessages(int limit,int timeout);
void commitMessage( QueueMessage queueMessage);
void commitMessages( List<QueueMessage> queueMessages);
- void sendMessages(List<String> bodies);
- void sendMessage(String body);
+ void sendMessages(List<Serializable> bodies) throws IOException;
+ void sendMessage(Serializable body)throws IOException;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a0a8ccd9/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueMessage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueMessage.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueMessage.java
index b07d220..1aef3a3 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueMessage.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueMessage.java
@@ -17,13 +17,12 @@
*/
package org.apache.usergrid.persistence.queue;
-
public class QueueMessage {
- private final String body;
+ private final Object body;
private final String messageId;
private final String handle;
- public QueueMessage(String messageId, String handle, String body) {
+ public QueueMessage(String messageId, String handle, Object body) {
this.body = body;
this.messageId = messageId;
this.handle = handle;
@@ -33,7 +32,7 @@ public class QueueMessage {
return handle;
}
- public String getBody(){
+ public Object getBody(){
return body;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a0a8ccd9/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
index d1f526b..84e3cab 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
@@ -31,11 +31,17 @@ import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import org.apache.commons.lang.StringUtils;
import org.apache.usergrid.persistence.queue.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.util.Base64Coder;
+import java.io.*;
import java.util.ArrayList;
import java.util.List;
public class SQSQueueManagerImpl implements QueueManager {
+ private static final Logger LOG = LoggerFactory.getLogger(SQSQueueManagerImpl.class);
+
private final AmazonSQSClient sqs;
private final QueueScope scope;
private final QueueFig fig;
@@ -80,24 +86,25 @@ public class SQSQueueManagerImpl implements QueueManager {
return queue;
}
- public void sendMessage(String body){
- SendMessageRequest request = new SendMessageRequest(getQueue().getUrl(),body);
+ public void sendMessage(Serializable body) throws IOException{
+ SendMessageRequest request = new SendMessageRequest(getQueue().getUrl(),toString(body));
sqs.sendMessage(request);
}
- public void sendMessages(List<String> bodies){
+
+ public void sendMessages(List<Serializable> bodies) throws IOException{
SendMessageBatchRequest request = new SendMessageBatchRequest(getQueue().getUrl());
List<SendMessageBatchRequestEntry> entries = new ArrayList<>(bodies.size());
- for(String body : bodies){
+ for(Serializable body : bodies){
SendMessageBatchRequestEntry entry = new SendMessageBatchRequestEntry();
- entry.setMessageBody(body);
+ entry.setMessageBody(toString(body));
entries.add(entry);
}
request.setEntries(entries);
sqs.sendMessageBatch(request);
}
- public List<QueueMessage> getMessages( int limit,int timeout){
+ public List<QueueMessage> getMessages( int limit,int timeout) {
System.out.println("Receiving messages from MyQueue.\n");
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(getQueue().getUrl());
receiveMessageRequest.setMaxNumberOfMessages(limit);
@@ -105,7 +112,14 @@ public class SQSQueueManagerImpl implements QueueManager {
List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();
List<QueueMessage> queueMessages = new ArrayList<>(messages.size());
for (Message message : messages) {
- QueueMessage queueMessage = new QueueMessage(message.getMessageId(),message.getReceiptHandle(),message.getBody());
+ Object body ;
+ try{
+ body = fromString( message.getBody());
+ }catch (Exception e){
+ LOG.error("failed to deserialize message", e);
+ body = message.getBody();
+ }
+ QueueMessage queueMessage = new QueueMessage(message.getMessageId(),message.getReceiptHandle(),body);
queueMessages.add(queueMessage);
}
return queueMessages;
@@ -125,6 +139,26 @@ public class SQSQueueManagerImpl implements QueueManager {
DeleteMessageBatchRequest request = new DeleteMessageBatchRequest(getQueue().getUrl(),entries);
DeleteMessageBatchResult result = sqs.deleteMessageBatch(request);
}
+
+ /** Read the object from Base64 string. */
+ private static Object fromString( String s ) throws IOException, ClassNotFoundException {
+ byte [] data = Base64Coder.decode(s.toCharArray());
+ ObjectInputStream ois = new ObjectInputStream(
+ new ByteArrayInputStream( data ) );
+ Object o = ois.readObject();
+ ois.close();
+ return o;
+ }
+
+ /** Write the object to a Base64 string. */
+ private static String toString( Serializable o ) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream( baos );
+ oos.writeObject( o );
+ oos.close();
+ return new String( Base64Coder.encode( baos.toByteArray() ) );
+ }
+
public class UsergridAwsCredentialsProvider implements AWSCredentialsProvider {
private AWSCredentials creds;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a0a8ccd9/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
index e5f3832..1cded57 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
@@ -19,29 +19,25 @@
package org.apache.usergrid.persistence.queue;
-
-import org.apache.usergrid.persistence.queue.QueueManagerFactory;
-import org.apache.usergrid.persistence.queue.QueueScope;
+import org.apache.usergrid.persistence.collection.util.InvalidEntityGenerator;
import org.apache.usergrid.persistence.queue.guice.TestQueueModule;
import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
import org.jukito.UseModules;
import org.junit.Before;
-import org.junit.Rule;
+import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.apache.usergrid.persistence.collection.guice.MigrationManagerRule;
import org.apache.usergrid.persistence.core.cassandra.ITRunner;
import org.apache.usergrid.persistence.model.entity.SimpleId;
import com.google.inject.Inject;
+import java.io.IOException;
import java.util.List;
-import java.util.UUID;
import static org.junit.Assert.*;
-
@RunWith( ITRunner.class )
@UseModules( { TestQueueModule.class } )
public class QueueManagerTest {
@@ -60,19 +56,21 @@ public class QueueManagerTest {
qm = qmf.getQueueManager(scope);
}
-
+ @Ignore("need aws creds")
@Test
public void get() {
Queue queue = qm.getQueue();
assertNotNull(queue);
}
-
+ @Ignore("need aws creds")
@Test
- public void send(){
- qm.sendMessage("bodytest");
+ public void send() throws IOException{
+ String value = "bodytest";
+ qm.sendMessage(value);
List<QueueMessage> messageList = qm.getMessages(1,5000);
assertTrue(messageList.size() >= 1);
for(QueueMessage message : messageList){
+ assertTrue(message.getBody().equals(value));
qm.commitMessage(message);
}
messageList = qm.getMessages(1,5000);