You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2014/10/07 21:33:16 UTC

[4/9] git commit: change serializer

change serializer


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/6f971cf6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/6f971cf6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/6f971cf6

Branch: refs/heads/collection_multiget
Commit: 6f971cf6ea67deac3b4ec71f50ebe66aea1de69e
Parents: 028d3d6
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Oct 6 14:33:26 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Oct 6 14:33:26 2014 -0600

----------------------------------------------------------------------
 .../persistence/queue/QueueManager.java         |   6 +-
 .../queue/impl/SQSQueueManagerImpl.java         | 101 +++++++++++--------
 .../persistence/queue/QueueManagerTest.java     |   6 +-
 .../services/notifications/QueueListener.java   |   2 +-
 4 files changed, 64 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6f971cf6/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
index 3509e4e..1f5a9e2 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
@@ -28,13 +28,13 @@ public interface QueueManager {
 
     Queue getQueue();
 
-    List<QueueMessage> getMessages(int limit,int transactionTimeout, int waitTime);
+    List<QueueMessage> getMessages(int limit,int transactionTimeout, int waitTime, Class klass) throws ClassNotFoundException, IOException;
 
     void commitMessage( QueueMessage queueMessage);
 
     void commitMessages( List<QueueMessage> queueMessages);
 
-    void sendMessages(List<Serializable> bodies) throws IOException;
+    void sendMessages(List bodies) throws IOException;
 
-    void sendMessage(Serializable body)throws IOException;
+    void sendMessage(Object body)throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6f971cf6/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
index cf6ff45..4614089 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
@@ -27,7 +27,10 @@ import com.amazonaws.regions.Region;
 import com.amazonaws.regions.Regions;
 import com.amazonaws.services.sqs.AmazonSQSClient;
 import com.amazonaws.services.sqs.model.*;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
 import com.google.inject.Inject;
+import com.fasterxml.jackson.dataformat.smile.SmileFactory;
 import com.google.inject.assistedinject.Assisted;
 import org.apache.commons.lang.StringUtils;
 import org.apache.usergrid.persistence.queue.*;
@@ -45,7 +48,10 @@ public class SQSQueueManagerImpl implements QueueManager {
     private final AmazonSQSClient sqs;
     private final QueueScope scope;
     private final QueueFig fig;
+    private final ObjectMapper mapper;
     private Queue queue;
+    private static SmileFactory smileFactory = new SmileFactory();
+
 
     @Inject
     public SQSQueueManagerImpl(@Assisted QueueScope scope, QueueFig fig){
@@ -56,9 +62,17 @@ public class SQSQueueManagerImpl implements QueueManager {
         Regions regions = Regions.fromName(fig.getRegion());
         Region region = Region.getRegion(regions);
         sqs.setRegion(region);
+        try {
+            smileFactory.delegateToTextual(true);
+            mapper = new ObjectMapper( smileFactory );
+            mapper.enable(SerializationFeature.INDENT_OUTPUT);
+            mapper.enableDefaultTypingAsProperty(ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT, "@class");
+        } catch ( Exception e ) {
+            throw new RuntimeException("Error setting up mapper", e);
+        }
     }
 
-
+    @Override
     public Queue createQueue(){
         String name = getName();
         CreateQueueRequest createQueueRequest = new CreateQueueRequest()
@@ -73,7 +87,7 @@ public class SQSQueueManagerImpl implements QueueManager {
         String name = scope.getApplication().getType() + scope.getApplication().getUuid().toString() + scope.getName();
         return name;
     }
-
+    @Override
     public Queue getQueue(){
         if(queue == null) {
             ListQueuesResult result =  sqs.listQueues();
@@ -91,36 +105,14 @@ public class SQSQueueManagerImpl implements QueueManager {
         return queue;
     }
 
-    public void sendMessage(Serializable body) throws IOException{
-        String url = getQueue().getUrl();
-        LOG.info("Sending Message...{} to {}",body.toString(),url);
-        SendMessageRequest request = new SendMessageRequest(url,toString(body));
-        sqs.sendMessage(request);
-    }
-
-
-    public void sendMessages(List<Serializable> bodies) throws IOException{
-        String url = getQueue().getUrl();
-        LOG.info("Sending Messages...{} to {}",bodies.size(),url);
-
-        SendMessageBatchRequest request = new SendMessageBatchRequest(url);
-        List<SendMessageBatchRequestEntry> entries = new ArrayList<>(bodies.size());
-        for(Serializable body : bodies){
-            SendMessageBatchRequestEntry entry = new SendMessageBatchRequestEntry();
-            entry.setMessageBody(toString(body));
-            entries.add(entry);
-        }
-        request.setEntries(entries);
-        sqs.sendMessageBatch(request);
-    }
-
-    public  List<QueueMessage> getMessages( int limit,int timeout, int waitTime) {
+    @Override
+    public List<QueueMessage> getMessages(int limit, int transactionTimeout, int waitTime, Class klass) throws ClassNotFoundException, IOException {
         waitTime = waitTime/1000;
         String url = getQueue().getUrl();
         LOG.info("Getting {} messages from {}",limit,url);
         ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(url);
         receiveMessageRequest.setMaxNumberOfMessages(limit);
-        receiveMessageRequest.setVisibilityTimeout(timeout);
+        receiveMessageRequest.setVisibilityTimeout(transactionTimeout);
         receiveMessageRequest.setWaitTimeSeconds(waitTime);
         ReceiveMessageResult result = sqs.receiveMessage(receiveMessageRequest);
         List<Message> messages = result.getMessages();
@@ -129,10 +121,10 @@ public class SQSQueueManagerImpl implements QueueManager {
         for (Message message : messages) {
             Object body ;
             try{
-                body = fromString(message.getBody());
+                body = fromString(message.getBody(),klass);
             }catch (Exception e){
                 LOG.error("failed to deserialize message", e);
-                body  = message.getBody();
+                throw e;
             }
             QueueMessage queueMessage = new QueueMessage(message.getMessageId(),message.getReceiptHandle(),body);
             queueMessages.add(queueMessage);
@@ -140,7 +132,8 @@ public class SQSQueueManagerImpl implements QueueManager {
         return queueMessages;
     }
 
-    public void commitMessage( QueueMessage queueMessage){
+    @Override
+    public void commitMessage(QueueMessage queueMessage) {
         String url = getQueue().getUrl();
         LOG.info("Commit message {} to queue {}",queueMessage.getMessageId(),url);
 
@@ -149,7 +142,33 @@ public class SQSQueueManagerImpl implements QueueManager {
                 .withReceiptHandle(queueMessage.getHandle()));
     }
 
-    public void commitMessages( List<QueueMessage> queueMessages){
+    @Override
+    public void sendMessages(List bodies) throws IOException {
+        String url = getQueue().getUrl();
+        LOG.info("Sending Messages...{} to {}",bodies.size(),url);
+
+        SendMessageBatchRequest request = new SendMessageBatchRequest(url);
+        List<SendMessageBatchRequestEntry> entries = new ArrayList<>(bodies.size());
+        for(Object body : bodies){
+            SendMessageBatchRequestEntry entry = new SendMessageBatchRequestEntry();
+            entry.setMessageBody(toString((Serializable)body));
+            entries.add(entry);
+        }
+        request.setEntries(entries);
+        sqs.sendMessageBatch(request);
+
+    }
+
+    @Override
+    public void sendMessage(Object body) throws IOException {
+        String url = getQueue().getUrl();
+        LOG.info("Sending Message...{} to {}",body.toString(),url);
+        SendMessageRequest request = new SendMessageRequest(url,toString((Serializable)body));
+        sqs.sendMessage(request);
+    }
+
+    @Override
+    public void commitMessages(List<QueueMessage> queueMessages) {
         String url = getQueue().getUrl();
         LOG.info("Commit messages {} to queue {}",queueMessages.size(),url);
         List<DeleteMessageBatchRequestEntry> entries = new ArrayList<>();
@@ -160,27 +179,21 @@ public class SQSQueueManagerImpl implements QueueManager {
         DeleteMessageBatchResult result = sqs.deleteMessageBatch(request);
         boolean successful = result.getFailed().size() > 0;
         if(!successful){
-            LOG.error("Commit failed {} messages",result.getFailed().size());
+            LOG.error("Commit failed {} messages", result.getFailed().size());
         }
     }
 
+
+
     /** Read the object from Base64 string. */
-    private static Object fromString( String s ) throws IOException, ClassNotFoundException {
-        byte [] data = Base64Coder.decode(s.toCharArray());
-        ObjectInputStream ois = new ObjectInputStream(
-                new ByteArrayInputStream(  data ) );
-        Object o  = ois.readObject();
-        ois.close();
+    private Object fromString( String s, Class klass ) throws IOException, ClassNotFoundException {
+        Object o =  mapper.readValue(s,klass);
         return o;
     }
 
     /** Write the object to a Base64 string. */
-    private static String toString( Serializable o ) throws IOException {
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        ObjectOutputStream oos = new ObjectOutputStream( baos );
-        oos.writeObject( o );
-        oos.close();
-        return new String( Base64Coder.encode( baos.toByteArray() ) );
+    private  String toString( Serializable o ) throws IOException {
+        return mapper.writeValueAsString(o);
     }
 
     public class UsergridAwsCredentialsProvider implements AWSCredentialsProvider {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6f971cf6/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
index 772b75e..8e38f8c 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
@@ -64,16 +64,16 @@ public class QueueManagerTest {
     }
     @Ignore("need aws creds")
     @Test
-    public void send() throws IOException{
+    public void send() throws IOException,ClassNotFoundException{
         String value = "bodytest";
         qm.sendMessage(value);
-        List<QueueMessage> messageList = qm.getMessages(1,5000,5000);
+        List<QueueMessage> messageList = qm.getMessages(1,5000,5000,String.class);
         assertTrue(messageList.size() >= 1);
         for(QueueMessage message : messageList){
             assertTrue(message.getBody().equals(value));
             qm.commitMessage(message);
         }
-        messageList = qm.getMessages(1,5000,5000);
+        messageList = qm.getMessages(1,5000,5000,String.class);
         assertTrue(messageList.size() <= 0);
 
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6f971cf6/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
index a381c70..accfa94 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
@@ -138,7 +138,7 @@ public class QueueListener  {
                 LOG.info("getting from queue {} ", queueName);
                 QueueScope queueScope = new QueueScopeImpl(new SimpleId(smf.getManagementAppId(),"notifications"),queueName);
                 QueueManager queueManager = queueManagerFactory.getQueueManager(queueScope);
-                List<QueueMessage> messages = queueManager.getMessages(getBatchSize(),MESSAGE_TRANSACTION_TIMEOUT,5000);
+                List<QueueMessage> messages = queueManager.getMessages(getBatchSize(),MESSAGE_TRANSACTION_TIMEOUT,5000,ApplicationQueueMessage.class);
                 LOG.info("retrieved batch of {} messages from queue {} ", messages.size(),queueName);
 
                 if (messages.size() > 0) {