You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2014/10/03 22:14:16 UTC

git commit: adding serialization

Repository: incubator-usergrid
Updated Branches:
  refs/heads/sqs_queues bfcc33e13 -> a0a8ccd9b


adding serialization


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/a0a8ccd9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/a0a8ccd9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/a0a8ccd9

Branch: refs/heads/sqs_queues
Commit: a0a8ccd9bae2ef2a21cdac50a5f96cd293c771ef
Parents: bfcc33e
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Oct 3 14:14:02 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Oct 3 14:14:02 2014 -0600

----------------------------------------------------------------------
 .../persistence/queue/QueueManager.java         | 10 ++--
 .../persistence/queue/QueueMessage.java         |  7 ++-
 .../queue/impl/SQSQueueManagerImpl.java         | 48 +++++++++++++++++---
 .../persistence/queue/QueueManagerTest.java     | 20 ++++----
 4 files changed, 58 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a0a8ccd9/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
index 992b533..48bc945 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
@@ -17,17 +17,17 @@
  */
 package org.apache.usergrid.persistence.queue;
 
+import java.io.IOException;
+import java.io.Serializable;
 import java.util.List;
 
-/**
- * Created by ApigeeCorporation on 10/3/14.
- */
+
 public interface QueueManager {
     Queue createQueue( );
     Queue getQueue();
     List<QueueMessage> getMessages(int limit,int timeout);
     void commitMessage( QueueMessage queueMessage);
     void commitMessages( List<QueueMessage> queueMessages);
-    void sendMessages(List<String> bodies);
-    void sendMessage(String body);
+    void sendMessages(List<Serializable> bodies) throws IOException;
+    void sendMessage(Serializable body)throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a0a8ccd9/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueMessage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueMessage.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueMessage.java
index b07d220..1aef3a3 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueMessage.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueMessage.java
@@ -17,13 +17,12 @@
  */
 package org.apache.usergrid.persistence.queue;
 
-
 public class QueueMessage {
-    private final String body;
+    private final Object body;
     private final String messageId;
     private final String handle;
 
-    public QueueMessage(String messageId, String handle, String body) {
+    public QueueMessage(String messageId, String handle, Object body) {
         this.body = body;
         this.messageId = messageId;
         this.handle = handle;
@@ -33,7 +32,7 @@ public class QueueMessage {
         return handle;
     }
 
-    public String getBody(){
+    public Object getBody(){
         return body;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a0a8ccd9/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
index d1f526b..84e3cab 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
@@ -31,11 +31,17 @@ import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
 import org.apache.commons.lang.StringUtils;
 import org.apache.usergrid.persistence.queue.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.util.Base64Coder;
 
+import java.io.*;
 import java.util.ArrayList;
 import java.util.List;
 
 public class SQSQueueManagerImpl implements QueueManager {
+    private static final Logger LOG = LoggerFactory.getLogger(SQSQueueManagerImpl.class);
+
     private final AmazonSQSClient sqs;
     private final QueueScope scope;
     private final QueueFig fig;
@@ -80,24 +86,25 @@ public class SQSQueueManagerImpl implements QueueManager {
         return queue;
     }
 
-    public void sendMessage(String body){
-        SendMessageRequest request = new SendMessageRequest(getQueue().getUrl(),body);
+    public void sendMessage(Serializable body) throws IOException{
+        SendMessageRequest request = new SendMessageRequest(getQueue().getUrl(),toString(body));
         sqs.sendMessage(request);
     }
 
-    public void sendMessages(List<String> bodies){
+
+    public void sendMessages(List<Serializable> bodies) throws IOException{
         SendMessageBatchRequest request = new SendMessageBatchRequest(getQueue().getUrl());
         List<SendMessageBatchRequestEntry> entries = new ArrayList<>(bodies.size());
-        for(String body : bodies){
+        for(Serializable body : bodies){
             SendMessageBatchRequestEntry entry = new SendMessageBatchRequestEntry();
-            entry.setMessageBody(body);
+            entry.setMessageBody(toString(body));
             entries.add(entry);
         }
         request.setEntries(entries);
         sqs.sendMessageBatch(request);
     }
 
-    public  List<QueueMessage> getMessages( int limit,int timeout){
+    public  List<QueueMessage> getMessages( int limit,int timeout) {
         System.out.println("Receiving messages from MyQueue.\n");
         ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(getQueue().getUrl());
         receiveMessageRequest.setMaxNumberOfMessages(limit);
@@ -105,7 +112,14 @@ public class SQSQueueManagerImpl implements QueueManager {
         List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();
         List<QueueMessage> queueMessages = new ArrayList<>(messages.size());
         for (Message message : messages) {
-            QueueMessage queueMessage = new QueueMessage(message.getMessageId(),message.getReceiptHandle(),message.getBody());
+            Object body ;
+            try{
+                body = fromString( message.getBody());
+            }catch (Exception e){
+                LOG.error("failed to deserialize message", e);
+                body  = message.getBody();
+            }
+            QueueMessage queueMessage = new QueueMessage(message.getMessageId(),message.getReceiptHandle(),body);
             queueMessages.add(queueMessage);
         }
         return queueMessages;
@@ -125,6 +139,26 @@ public class SQSQueueManagerImpl implements QueueManager {
         DeleteMessageBatchRequest request = new DeleteMessageBatchRequest(getQueue().getUrl(),entries);
         DeleteMessageBatchResult result = sqs.deleteMessageBatch(request);
     }
+
+    /** Read the object from Base64 string. */
+    private static Object fromString( String s ) throws IOException, ClassNotFoundException {
+        byte [] data = Base64Coder.decode(s.toCharArray());
+        ObjectInputStream ois = new ObjectInputStream(
+                new ByteArrayInputStream(  data ) );
+        Object o  = ois.readObject();
+        ois.close();
+        return o;
+    }
+
+    /** Write the object to a Base64 string. */
+    private static String toString( Serializable o ) throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        ObjectOutputStream oos = new ObjectOutputStream( baos );
+        oos.writeObject( o );
+        oos.close();
+        return new String( Base64Coder.encode( baos.toByteArray() ) );
+    }
+
     public class UsergridAwsCredentialsProvider implements AWSCredentialsProvider {
 
         private AWSCredentials creds;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a0a8ccd9/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
index e5f3832..1cded57 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
@@ -19,29 +19,25 @@
 
 package org.apache.usergrid.persistence.queue;
 
-
-import org.apache.usergrid.persistence.queue.QueueManagerFactory;
-import org.apache.usergrid.persistence.queue.QueueScope;
+import org.apache.usergrid.persistence.collection.util.InvalidEntityGenerator;
 import org.apache.usergrid.persistence.queue.guice.TestQueueModule;
 import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
 import org.jukito.UseModules;
 import org.junit.Before;
-import org.junit.Rule;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 
-import org.apache.usergrid.persistence.collection.guice.MigrationManagerRule;
 import org.apache.usergrid.persistence.core.cassandra.ITRunner;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
 
 import com.google.inject.Inject;
 
+import java.io.IOException;
 import java.util.List;
-import java.util.UUID;
 
 import static org.junit.Assert.*;
 
-
 @RunWith( ITRunner.class )
 @UseModules( { TestQueueModule.class } )
 public class QueueManagerTest {
@@ -60,19 +56,21 @@ public class QueueManagerTest {
         qm = qmf.getQueueManager(scope);
     }
 
-
+    @Ignore("need aws creds")
     @Test
     public void get() {
         Queue queue = qm.getQueue();
         assertNotNull(queue);
     }
-
+    @Ignore("need aws creds")
     @Test
-    public void send(){
-        qm.sendMessage("bodytest");
+    public void send() throws IOException{
+        String value = "bodytest";
+        qm.sendMessage(value);
         List<QueueMessage> messageList = qm.getMessages(1,5000);
         assertTrue(messageList.size() >= 1);
         for(QueueMessage message : messageList){
+            assertTrue(message.getBody().equals(value));
             qm.commitMessage(message);
         }
         messageList = qm.getMessages(1,5000);