You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2015/10/14 18:54:08 UTC

[12/39] usergrid git commit: add delete queue for cleanup and poll for messages

add delete queue for cleanup and poll for messages


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/c7a6ebf0
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/c7a6ebf0
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/c7a6ebf0

Branch: refs/heads/usergrid-1007-shiro-cache
Commit: c7a6ebf0329a95e6813667d81fa387ecf0240d14
Parents: da6afb1
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Sep 25 10:36:40 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Sep 25 10:36:40 2015 -0600

----------------------------------------------------------------------
 .../persistence/queue/DefaultQueueManager.java  |  5 +++
 .../persistence/queue/QueueManager.java         |  5 +++
 .../queue/impl/SNSQueueManagerImpl.java         |  9 +++++
 .../queue/impl/SQSQueueManagerImpl.java         |  7 ++++
 .../persistence/queue/QueueManagerTest.java     | 37 ++++++++++++++++----
 .../services/queues/ImportQueueManager.java     |  5 +++
 6 files changed, 61 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/c7a6ebf0/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
index c72e109..d974529 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
@@ -72,4 +72,9 @@ public class DefaultQueueManager implements QueueManager {
         String uuid = UUID.randomUUID().toString();
         queue.add(new QueueMessage(uuid,"handle_"+uuid,body,"put type here"));
     }
+
+    @Override
+    public void deleteQueue() {
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/c7a6ebf0/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
index 0ec2337..027abb2 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
@@ -69,4 +69,9 @@ public interface QueueManager {
      * @throws IOException
      */
     void sendMessage(Object body)throws IOException;
+
+    /**
+     * purge messages
+     */
+    void deleteQueue();
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/c7a6ebf0/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index 8fb0f52..bc63f53 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -449,6 +449,15 @@ public class SNSQueueManagerImpl implements QueueManager {
 
     }
 
+    @Override
+    public void deleteQueue() {
+        logger.warn("Deleting queue: "+getReadQueue().getUrl());
+        sqs.deleteQueue(new DeleteQueueRequest().withQueueUrl(getReadQueue().getUrl()));
+        logger.warn("Deleting queue: "+getReadQueue().getUrl()+"_dead");
+        sqs.deleteQueue(new DeleteQueueRequest().withQueueUrl(getReadQueue().getUrl()+"_dead"));
+
+    }
+
 
     @Override
     public void commitMessage(final QueueMessage queueMessage) {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/c7a6ebf0/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
index 075e90c..daa1cb5 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
@@ -327,6 +327,13 @@ public class SQSQueueManagerImpl implements QueueManager {
         return region;
     }
 
+    @Override
+    public void deleteQueue() {
+        logger.warn("Deleting queue: "+getQueue().getUrl());
+        sqs.deleteQueue(new DeleteQueueRequest().withQueueUrl(getQueue().getUrl()));
+    }
+
+
 
     /**
      * Create the SQS client for the specified settings

http://git-wip-us.apache.org/repos/asf/usergrid/blob/c7a6ebf0/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
index 29e88ce..ac70af6 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
@@ -20,7 +20,6 @@
 package org.apache.usergrid.persistence.queue;
 
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -63,16 +62,24 @@ public class QueueManagerTest {
     protected QueueScope scope;
     private QueueManager qm;
 
+    public static long queueSeed = System.currentTimeMillis();
+
 
     @Before
     public void mockApp() {
-        this.scope = new QueueScopeImpl( "testQueue", QueueScope.RegionImplementation.LOCAL);
+
+        this.scope = new QueueScopeImpl( "testQueue"+queueSeed++, QueueScope.RegionImplementation.LOCAL);
         qm = qmf.getQueueManager(scope);
     }
 
+    @org.junit.After
+    public void cleanup(){
+        qm.deleteQueue();
+    }
+
 
     @Test
-    public void send() throws IOException,ClassNotFoundException{
+    public void send() throws Exception{
         String value = "bodytest";
         qm.sendMessage(value);
         List<QueueMessage> messageList = qm.getMessages(1,5000,5000,String.class).toList().toBlocking().last();
@@ -81,13 +88,14 @@ public class QueueManagerTest {
             assertTrue(message.getBody().equals(value));
             qm.commitMessage(message);
         }
+
         messageList = qm.getMessages(1,5000,5000,String.class).toList().toBlocking().last();
         assertTrue(messageList.size() <= 0);
 
     }
 
     @Test
-    public void sendMore() throws IOException,ClassNotFoundException{
+    public void sendMore() throws Exception{
         HashMap<String,String> values = new HashMap<>();
         values.put("test","Test");
 
@@ -107,7 +115,7 @@ public class QueueManagerTest {
     }
 
     @Test
-    public void queueSize() throws IOException,ClassNotFoundException{
+    public void queueSize() throws Exception{
         HashMap<String,String> values = new HashMap<>();
         values.put("test", "Test");
 
@@ -115,8 +123,16 @@ public class QueueManagerTest {
         bodies.add(values);
         long initialDepth = qm.getQueueDepth();
         qm.sendMessages(bodies);
-        long depth = qm.getQueueDepth();
+        long depth = 0;
+        for(int i=0; i<10;i++){
+             depth = qm.getQueueDepth();
+            if(depth>0){
+                break;
+            }
+            Thread.sleep(1000);
+        }
         assertTrue(depth>0);
+
         List<QueueMessage> messageList = qm.getMessages(10,5000,5000,values.getClass()).toList().toBlocking().last();
         assertTrue(messageList.size() <= 500);
         for(QueueMessage message : messageList){
@@ -125,9 +141,16 @@ public class QueueManagerTest {
         if(messageList.size()>0) {
             qm.commitMessages(messageList);
         }
-        depth = qm.getQueueDepth();
+        for(int i=0; i<10;i++){
+            depth = qm.getQueueDepth();
+            if(depth==initialDepth){
+                break;
+            }
+            Thread.sleep(1000);
+        }
         assertEquals(initialDepth, depth);
     }
 
 
+
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/c7a6ebf0/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
index d74f688..bca9a49 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
@@ -68,4 +68,9 @@ public class ImportQueueManager implements QueueManager {
     public void sendMessage( final Object body ) throws IOException {
 
     }
+
+    @Override
+    public void deleteQueue() {
+
+    }
 }