You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2014/10/07 21:27:30 UTC

[14/17] git commit: prevent queue manager from blowing up

prevent queue manager from blowing up


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/f4b1efb1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/f4b1efb1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/f4b1efb1

Branch: refs/heads/two-dot-o-events
Commit: f4b1efb1a7d46370747f0156643751b8e03011be
Parents: ba47088
Author: Shawn Feldman <sf...@apache.org>
Authored: Tue Oct 7 12:57:26 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Tue Oct 7 12:57:26 2014 -0600

----------------------------------------------------------------------
 .../queue/impl/SQSQueueManagerImpl.java         | 33 ++++++++++++++------
 1 file changed, 23 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f4b1efb1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
index 6fa5bf8..240c380 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
@@ -43,10 +43,10 @@ import java.util.UUID;
 public class SQSQueueManagerImpl implements QueueManager {
     private static final Logger LOG = LoggerFactory.getLogger(SQSQueueManagerImpl.class);
 
-    private final AmazonSQSClient sqs;
-    private final QueueScope scope;
-    private final QueueFig fig;
-    private final ObjectMapper mapper;
+    private  AmazonSQSClient sqs;
+    private  QueueScope scope;
+    private  QueueFig fig;
+    private  ObjectMapper mapper;
     private Queue queue;
     private static SmileFactory smileFactory = new SmileFactory();
 
@@ -55,18 +55,19 @@ public class SQSQueueManagerImpl implements QueueManager {
     public SQSQueueManagerImpl(@Assisted QueueScope scope, QueueFig fig){
         this.fig = fig;
         this.scope = scope;
-        UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
-        this.sqs = new AmazonSQSClient(ugProvider.getCredentials());
-        Regions regions = Regions.fromName(fig.getRegion());
-        Region region = Region.getRegion(regions);
-        sqs.setRegion(region);
         try {
+            UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
+            this.sqs = new AmazonSQSClient(ugProvider.getCredentials());
+            Regions regions = Regions.fromName(fig.getRegion());
+            Region region = Region.getRegion(regions);
+            sqs.setRegion(region);
             smileFactory.delegateToTextual(true);
             mapper = new ObjectMapper( smileFactory );
             mapper.enable(SerializationFeature.INDENT_OUTPUT);
             mapper.enableDefaultTypingAsProperty(ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT, "@class");
         } catch ( Exception e ) {
-            throw new RuntimeException("Error setting up mapper", e);
+            LOG.error("failed to setup SQS",e);
+//            throw new RuntimeException("Error setting up mapper", e);
         }
     }
 
@@ -103,6 +104,10 @@ public class SQSQueueManagerImpl implements QueueManager {
 
     @Override
     public List<QueueMessage> getMessages(int limit, int transactionTimeout, int waitTime, Class klass) {
+        if(sqs == null){
+            LOG.error("Sqs is null");
+            return new ArrayList<>();
+        }
         waitTime = waitTime/1000;
         String url = getQueue().getUrl();
         LOG.info("Getting {} messages from {}", limit, url);
@@ -130,6 +135,10 @@ public class SQSQueueManagerImpl implements QueueManager {
 
     @Override
     public void sendMessages(List bodies) throws IOException {
+        if(sqs == null){
+            LOG.error("Sqs is null");
+            return;
+        }
         String url = getQueue().getUrl();
         LOG.info("Sending Messages...{} to {}",bodies.size(),url);
 
@@ -148,6 +157,10 @@ public class SQSQueueManagerImpl implements QueueManager {
 
     @Override
     public void sendMessage(Object body) throws IOException {
+        if(sqs == null){
+            LOG.error("Sqs is null");
+            return;
+        }
         String url = getQueue().getUrl();
         LOG.info("Sending Message...{} to {}",body.toString(),url);
         SendMessageRequest request = new SendMessageRequest(url,toString((Serializable)body));