You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2014/10/06 23:38:48 UTC
[4/6] git commit: change serializer
change serializer
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/6f971cf6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/6f971cf6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/6f971cf6
Branch: refs/heads/two-dot-o
Commit: 6f971cf6ea67deac3b4ec71f50ebe66aea1de69e
Parents: 028d3d6
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Oct 6 14:33:26 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Oct 6 14:33:26 2014 -0600
----------------------------------------------------------------------
.../persistence/queue/QueueManager.java | 6 +-
.../queue/impl/SQSQueueManagerImpl.java | 101 +++++++++++--------
.../persistence/queue/QueueManagerTest.java | 6 +-
.../services/notifications/QueueListener.java | 2 +-
4 files changed, 64 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6f971cf6/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
index 3509e4e..1f5a9e2 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
@@ -28,13 +28,13 @@ public interface QueueManager {
Queue getQueue();
- List<QueueMessage> getMessages(int limit,int transactionTimeout, int waitTime);
+ List<QueueMessage> getMessages(int limit,int transactionTimeout, int waitTime, Class klass) throws ClassNotFoundException, IOException;
void commitMessage( QueueMessage queueMessage);
void commitMessages( List<QueueMessage> queueMessages);
- void sendMessages(List<Serializable> bodies) throws IOException;
+ void sendMessages(List bodies) throws IOException;
- void sendMessage(Serializable body)throws IOException;
+ void sendMessage(Object body)throws IOException;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6f971cf6/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
index cf6ff45..4614089 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
@@ -27,7 +27,10 @@ import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.sqs.AmazonSQSClient;
import com.amazonaws.services.sqs.model.*;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
import com.google.inject.Inject;
+import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.inject.assistedinject.Assisted;
import org.apache.commons.lang.StringUtils;
import org.apache.usergrid.persistence.queue.*;
@@ -45,7 +48,10 @@ public class SQSQueueManagerImpl implements QueueManager {
private final AmazonSQSClient sqs;
private final QueueScope scope;
private final QueueFig fig;
+ private final ObjectMapper mapper;
private Queue queue;
+ private static SmileFactory smileFactory = new SmileFactory();
+
@Inject
public SQSQueueManagerImpl(@Assisted QueueScope scope, QueueFig fig){
@@ -56,9 +62,17 @@ public class SQSQueueManagerImpl implements QueueManager {
Regions regions = Regions.fromName(fig.getRegion());
Region region = Region.getRegion(regions);
sqs.setRegion(region);
+ try {
+ smileFactory.delegateToTextual(true);
+ mapper = new ObjectMapper( smileFactory );
+ mapper.enable(SerializationFeature.INDENT_OUTPUT);
+ mapper.enableDefaultTypingAsProperty(ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT, "@class");
+ } catch ( Exception e ) {
+ throw new RuntimeException("Error setting up mapper", e);
+ }
}
-
+ @Override
public Queue createQueue(){
String name = getName();
CreateQueueRequest createQueueRequest = new CreateQueueRequest()
@@ -73,7 +87,7 @@ public class SQSQueueManagerImpl implements QueueManager {
String name = scope.getApplication().getType() + scope.getApplication().getUuid().toString() + scope.getName();
return name;
}
-
+ @Override
public Queue getQueue(){
if(queue == null) {
ListQueuesResult result = sqs.listQueues();
@@ -91,36 +105,14 @@ public class SQSQueueManagerImpl implements QueueManager {
return queue;
}
- public void sendMessage(Serializable body) throws IOException{
- String url = getQueue().getUrl();
- LOG.info("Sending Message...{} to {}",body.toString(),url);
- SendMessageRequest request = new SendMessageRequest(url,toString(body));
- sqs.sendMessage(request);
- }
-
-
- public void sendMessages(List<Serializable> bodies) throws IOException{
- String url = getQueue().getUrl();
- LOG.info("Sending Messages...{} to {}",bodies.size(),url);
-
- SendMessageBatchRequest request = new SendMessageBatchRequest(url);
- List<SendMessageBatchRequestEntry> entries = new ArrayList<>(bodies.size());
- for(Serializable body : bodies){
- SendMessageBatchRequestEntry entry = new SendMessageBatchRequestEntry();
- entry.setMessageBody(toString(body));
- entries.add(entry);
- }
- request.setEntries(entries);
- sqs.sendMessageBatch(request);
- }
-
- public List<QueueMessage> getMessages( int limit,int timeout, int waitTime) {
+ @Override
+ public List<QueueMessage> getMessages(int limit, int transactionTimeout, int waitTime, Class klass) throws ClassNotFoundException, IOException {
waitTime = waitTime/1000;
String url = getQueue().getUrl();
LOG.info("Getting {} messages from {}",limit,url);
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(url);
receiveMessageRequest.setMaxNumberOfMessages(limit);
- receiveMessageRequest.setVisibilityTimeout(timeout);
+ receiveMessageRequest.setVisibilityTimeout(transactionTimeout);
receiveMessageRequest.setWaitTimeSeconds(waitTime);
ReceiveMessageResult result = sqs.receiveMessage(receiveMessageRequest);
List<Message> messages = result.getMessages();
@@ -129,10 +121,10 @@ public class SQSQueueManagerImpl implements QueueManager {
for (Message message : messages) {
Object body ;
try{
- body = fromString(message.getBody());
+ body = fromString(message.getBody(),klass);
}catch (Exception e){
LOG.error("failed to deserialize message", e);
- body = message.getBody();
+ throw e;
}
QueueMessage queueMessage = new QueueMessage(message.getMessageId(),message.getReceiptHandle(),body);
queueMessages.add(queueMessage);
@@ -140,7 +132,8 @@ public class SQSQueueManagerImpl implements QueueManager {
return queueMessages;
}
- public void commitMessage( QueueMessage queueMessage){
+ @Override
+ public void commitMessage(QueueMessage queueMessage) {
String url = getQueue().getUrl();
LOG.info("Commit message {} to queue {}",queueMessage.getMessageId(),url);
@@ -149,7 +142,33 @@ public class SQSQueueManagerImpl implements QueueManager {
.withReceiptHandle(queueMessage.getHandle()));
}
- public void commitMessages( List<QueueMessage> queueMessages){
+ @Override
+ public void sendMessages(List bodies) throws IOException {
+ String url = getQueue().getUrl();
+ LOG.info("Sending Messages...{} to {}",bodies.size(),url);
+
+ SendMessageBatchRequest request = new SendMessageBatchRequest(url);
+ List<SendMessageBatchRequestEntry> entries = new ArrayList<>(bodies.size());
+ for(Object body : bodies){
+ SendMessageBatchRequestEntry entry = new SendMessageBatchRequestEntry();
+ entry.setMessageBody(toString((Serializable)body));
+ entries.add(entry);
+ }
+ request.setEntries(entries);
+ sqs.sendMessageBatch(request);
+
+ }
+
+ @Override
+ public void sendMessage(Object body) throws IOException {
+ String url = getQueue().getUrl();
+ LOG.info("Sending Message...{} to {}",body.toString(),url);
+ SendMessageRequest request = new SendMessageRequest(url,toString((Serializable)body));
+ sqs.sendMessage(request);
+ }
+
+ @Override
+ public void commitMessages(List<QueueMessage> queueMessages) {
String url = getQueue().getUrl();
LOG.info("Commit messages {} to queue {}",queueMessages.size(),url);
List<DeleteMessageBatchRequestEntry> entries = new ArrayList<>();
@@ -160,27 +179,21 @@ public class SQSQueueManagerImpl implements QueueManager {
DeleteMessageBatchResult result = sqs.deleteMessageBatch(request);
boolean successful = result.getFailed().size() > 0;
if(!successful){
- LOG.error("Commit failed {} messages",result.getFailed().size());
+ LOG.error("Commit failed {} messages", result.getFailed().size());
}
}
+
+
/** Read the object from Base64 string. */
- private static Object fromString( String s ) throws IOException, ClassNotFoundException {
- byte [] data = Base64Coder.decode(s.toCharArray());
- ObjectInputStream ois = new ObjectInputStream(
- new ByteArrayInputStream( data ) );
- Object o = ois.readObject();
- ois.close();
+ private Object fromString( String s, Class klass ) throws IOException, ClassNotFoundException {
+ Object o = mapper.readValue(s,klass);
return o;
}
/** Write the object to a Base64 string. */
- private static String toString( Serializable o ) throws IOException {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- ObjectOutputStream oos = new ObjectOutputStream( baos );
- oos.writeObject( o );
- oos.close();
- return new String( Base64Coder.encode( baos.toByteArray() ) );
+ private String toString( Serializable o ) throws IOException {
+ return mapper.writeValueAsString(o);
}
public class UsergridAwsCredentialsProvider implements AWSCredentialsProvider {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6f971cf6/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
index 772b75e..8e38f8c 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
@@ -64,16 +64,16 @@ public class QueueManagerTest {
}
@Ignore("need aws creds")
@Test
- public void send() throws IOException{
+ public void send() throws IOException,ClassNotFoundException{
String value = "bodytest";
qm.sendMessage(value);
- List<QueueMessage> messageList = qm.getMessages(1,5000,5000);
+ List<QueueMessage> messageList = qm.getMessages(1,5000,5000,String.class);
assertTrue(messageList.size() >= 1);
for(QueueMessage message : messageList){
assertTrue(message.getBody().equals(value));
qm.commitMessage(message);
}
- messageList = qm.getMessages(1,5000,5000);
+ messageList = qm.getMessages(1,5000,5000,String.class);
assertTrue(messageList.size() <= 0);
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6f971cf6/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
index a381c70..accfa94 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
@@ -138,7 +138,7 @@ public class QueueListener {
LOG.info("getting from queue {} ", queueName);
QueueScope queueScope = new QueueScopeImpl(new SimpleId(smf.getManagementAppId(),"notifications"),queueName);
QueueManager queueManager = queueManagerFactory.getQueueManager(queueScope);
- List<QueueMessage> messages = queueManager.getMessages(getBatchSize(),MESSAGE_TRANSACTION_TIMEOUT,5000);
+ List<QueueMessage> messages = queueManager.getMessages(getBatchSize(),MESSAGE_TRANSACTION_TIMEOUT,5000,ApplicationQueueMessage.class);
LOG.info("retrieved batch of {} messages from queue {} ", messages.size(),queueName);
if (messages.size() > 0) {