You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/10/16 23:08:52 UTC

[6/9] usergrid git commit: change queue impl

change queue impl


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/4fa6749f
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/4fa6749f
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/4fa6749f

Branch: refs/heads/remove-inmemory-event-service
Commit: 4fa6749f027ece9388725dcdeb875f4bddbf7f5f
Parents: 0cc225f
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Oct 16 15:07:36 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Oct 16 15:07:36 2015 -0600

----------------------------------------------------------------------
 .../persistence/queue/DefaultQueueManager.java  | 34 ++++++++++++++++----
 1 file changed, 27 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/4fa6749f/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
index 0ef2849..ae0b0aa 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
@@ -30,17 +30,29 @@ import java.util.UUID;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Default queue manager implementation, uses in memory linked queue
  */
 public class DefaultQueueManager implements QueueManager {
-    public LinkedBlockingQueue<QueueMessage> queue = new LinkedBlockingQueue<>();
+    public ArrayBlockingQueue<QueueMessage> queue = new ArrayBlockingQueue<>(1000);
 
     @Override
-    public  Observable<QueueMessage> getMessages(int limit, int transactionTimeout, int waitTime, Class klass) {
+    public    Observable<QueueMessage> getMessages(int limit, int transactionTimeout, int waitTime, Class klass) {
         List<QueueMessage> returnQueue = new ArrayList<>();
-        queue.drainTo(returnQueue,1000);
+        try {
+            QueueMessage message=null;
+            int count = 10;
+            do {
+                message = queue.poll(100, TimeUnit.MILLISECONDS);
+                if (message != null) {
+                    returnQueue.add(message);
+                }
+            }while(message!=null && count-->0);
+        }catch (InterruptedException ie){
+            throw new RuntimeException(ie);
+        }
         return Observable.from( returnQueue);
     }
 
@@ -58,17 +70,25 @@ public class DefaultQueueManager implements QueueManager {
     }
 
     @Override
-    public synchronized void sendMessages(List bodies) throws IOException {
+    public  void sendMessages(List bodies) throws IOException {
         for(Object body : bodies){
             String uuid = UUID.randomUUID().toString();
-            queue.add(new QueueMessage(uuid,"handle_"+uuid,body,"putappriate type here"));
+            try {
+                queue.put(new QueueMessage(uuid, "handle_" + uuid, body, "put type here"));
+            }catch (InterruptedException ie){
+                throw new RuntimeException(ie);
+            }
         }
     }
 
     @Override
-    public synchronized void sendMessage(Object body) throws IOException {
+    public  void sendMessage(Object body) throws IOException {
         String uuid = UUID.randomUUID().toString();
-        queue.add(new QueueMessage(uuid,"handle_"+uuid,body,"put type here"));
+        try {
+            queue.put(new QueueMessage(uuid, "handle_" + uuid, body, "put type here"));
+        }catch (InterruptedException ie){
+            throw new RuntimeException(ie);
+        }
     }
 
     @Override