You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2015/10/14 18:54:08 UTC
[12/39] usergrid git commit: add delete queue for cleanup and poll
for messages
add delete queue for cleanup and poll for messages
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/c7a6ebf0
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/c7a6ebf0
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/c7a6ebf0
Branch: refs/heads/usergrid-1007-shiro-cache
Commit: c7a6ebf0329a95e6813667d81fa387ecf0240d14
Parents: da6afb1
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Sep 25 10:36:40 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Sep 25 10:36:40 2015 -0600
----------------------------------------------------------------------
.../persistence/queue/DefaultQueueManager.java | 5 +++
.../persistence/queue/QueueManager.java | 5 +++
.../queue/impl/SNSQueueManagerImpl.java | 9 +++++
.../queue/impl/SQSQueueManagerImpl.java | 7 ++++
.../persistence/queue/QueueManagerTest.java | 37 ++++++++++++++++----
.../services/queues/ImportQueueManager.java | 5 +++
6 files changed, 61 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/c7a6ebf0/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
index c72e109..d974529 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
@@ -72,4 +72,9 @@ public class DefaultQueueManager implements QueueManager {
String uuid = UUID.randomUUID().toString();
queue.add(new QueueMessage(uuid,"handle_"+uuid,body,"put type here"));
}
+
+ @Override
+ public void deleteQueue() {
+
+ }
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/c7a6ebf0/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
index 0ec2337..027abb2 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
@@ -69,4 +69,9 @@ public interface QueueManager {
* @throws IOException
*/
void sendMessage(Object body)throws IOException;
+
+ /**
+ * purge messages
+ */
+ void deleteQueue();
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/c7a6ebf0/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index 8fb0f52..bc63f53 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -449,6 +449,15 @@ public class SNSQueueManagerImpl implements QueueManager {
}
+ @Override
+ public void deleteQueue() {
+ logger.warn("Deleting queue: "+getReadQueue().getUrl());
+ sqs.deleteQueue(new DeleteQueueRequest().withQueueUrl(getReadQueue().getUrl()));
+ logger.warn("Deleting queue: "+getReadQueue().getUrl()+"_dead");
+ sqs.deleteQueue(new DeleteQueueRequest().withQueueUrl(getReadQueue().getUrl()+"_dead"));
+
+ }
+
@Override
public void commitMessage(final QueueMessage queueMessage) {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/c7a6ebf0/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
index 075e90c..daa1cb5 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
@@ -327,6 +327,13 @@ public class SQSQueueManagerImpl implements QueueManager {
return region;
}
+ @Override
+ public void deleteQueue() {
+ logger.warn("Deleting queue: "+getQueue().getUrl());
+ sqs.deleteQueue(new DeleteQueueRequest().withQueueUrl(getQueue().getUrl()));
+ }
+
+
/**
* Create the SQS client for the specified settings
http://git-wip-us.apache.org/repos/asf/usergrid/blob/c7a6ebf0/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
index 29e88ce..ac70af6 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
@@ -20,7 +20,6 @@
package org.apache.usergrid.persistence.queue;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -63,16 +62,24 @@ public class QueueManagerTest {
protected QueueScope scope;
private QueueManager qm;
+ public static long queueSeed = System.currentTimeMillis();
+
@Before
public void mockApp() {
- this.scope = new QueueScopeImpl( "testQueue", QueueScope.RegionImplementation.LOCAL);
+
+ this.scope = new QueueScopeImpl( "testQueue"+queueSeed++, QueueScope.RegionImplementation.LOCAL);
qm = qmf.getQueueManager(scope);
}
+ @org.junit.After
+ public void cleanup(){
+ qm.deleteQueue();
+ }
+
@Test
- public void send() throws IOException,ClassNotFoundException{
+ public void send() throws Exception{
String value = "bodytest";
qm.sendMessage(value);
List<QueueMessage> messageList = qm.getMessages(1,5000,5000,String.class).toList().toBlocking().last();
@@ -81,13 +88,14 @@ public class QueueManagerTest {
assertTrue(message.getBody().equals(value));
qm.commitMessage(message);
}
+
messageList = qm.getMessages(1,5000,5000,String.class).toList().toBlocking().last();
assertTrue(messageList.size() <= 0);
}
@Test
- public void sendMore() throws IOException,ClassNotFoundException{
+ public void sendMore() throws Exception{
HashMap<String,String> values = new HashMap<>();
values.put("test","Test");
@@ -107,7 +115,7 @@ public class QueueManagerTest {
}
@Test
- public void queueSize() throws IOException,ClassNotFoundException{
+ public void queueSize() throws Exception{
HashMap<String,String> values = new HashMap<>();
values.put("test", "Test");
@@ -115,8 +123,16 @@ public class QueueManagerTest {
bodies.add(values);
long initialDepth = qm.getQueueDepth();
qm.sendMessages(bodies);
- long depth = qm.getQueueDepth();
+ long depth = 0;
+ for(int i=0; i<10;i++){
+ depth = qm.getQueueDepth();
+ if(depth>0){
+ break;
+ }
+ Thread.sleep(1000);
+ }
assertTrue(depth>0);
+
List<QueueMessage> messageList = qm.getMessages(10,5000,5000,values.getClass()).toList().toBlocking().last();
assertTrue(messageList.size() <= 500);
for(QueueMessage message : messageList){
@@ -125,9 +141,16 @@ public class QueueManagerTest {
if(messageList.size()>0) {
qm.commitMessages(messageList);
}
- depth = qm.getQueueDepth();
+ for(int i=0; i<10;i++){
+ depth = qm.getQueueDepth();
+ if(depth==initialDepth){
+ break;
+ }
+ Thread.sleep(1000);
+ }
assertEquals(initialDepth, depth);
}
+
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/c7a6ebf0/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
index d74f688..bca9a49 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
@@ -68,4 +68,9 @@ public class ImportQueueManager implements QueueManager {
public void sendMessage( final Object body ) throws IOException {
}
+
+ @Override
+ public void deleteQueue() {
+
+ }
}