You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/11/09 22:14:31 UTC

usergrid git commit: fix queue manager impl

Repository: usergrid
Updated Branches:
  refs/heads/jackson-exclusion 5222b9a04 -> 9fd657b84


fix queue manager impl


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/9fd657b8
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/9fd657b8
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/9fd657b8

Branch: refs/heads/jackson-exclusion
Commit: 9fd657b84ee86af50ee207e2b5d60054964c30e0
Parents: 5222b9a
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Nov 9 14:14:28 2015 -0700
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Nov 9 14:14:28 2015 -0700

----------------------------------------------------------------------
 .../persistence/queue/DefaultQueueManager.java  | 61 +++++++++++++-------
 1 file changed, 40 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/9fd657b8/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
index 5201279..c3b5917 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
@@ -1,3 +1,4 @@
+
 /*
  *
  *  * Licensed to the Apache Software Foundation (ASF) under one or more
@@ -18,33 +19,43 @@
  *
  */
 
-package org.apache.usergrid.persistence.queue;
+    package org.apache.usergrid.persistence.queue;
 
-import rx.Observable;
+    import rx.Observable;
 
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.ArrayBlockingQueue;
+    import java.io.IOException;
+    import java.util.AbstractQueue;
+    import java.io.Serializable;
+    import java.util.ArrayList;
+    import java.util.List;
+    import java.util.UUID;
+    import java.util.concurrent.ArrayBlockingQueue;
+    import java.util.concurrent.LinkedBlockingQueue;
+    import java.util.concurrent.PriorityBlockingQueue;
+    import java.util.concurrent.TimeUnit;
 
 /**
  * Default queue manager implementation, uses in memory linked queue
  */
 public class DefaultQueueManager implements QueueManager {
     public ArrayBlockingQueue<QueueMessage> queue = new ArrayBlockingQueue<>(10000);
+
     @Override
-    public synchronized Observable<QueueMessage> getMessages(int limit, int transactionTimeout, int waitTime, Class klass) {
+    public    Observable<QueueMessage> getMessages(int limit, int transactionTimeout, int waitTime, Class klass) {
         List<QueueMessage> returnQueue = new ArrayList<>();
-        for(int i=0;i<limit;i++){
-            if(!queue.isEmpty()){
-                returnQueue.add( queue.remove());
-            }else{
-                break;
-            }
+        try {
+            QueueMessage message=null;
+            int count = 5;
+            do {
+                message = queue.poll(100, TimeUnit.MILLISECONDS);
+                if (message != null) {
+                    returnQueue.add(message);
+                }
+            }while(message!=null && count-->0);
+        }catch (InterruptedException ie){
+            throw new RuntimeException(ie);
         }
-        return Observable.from( returnQueue);
+        return rx.Observable.from(returnQueue);
     }
 
     @Override
@@ -61,10 +72,14 @@ public class DefaultQueueManager implements QueueManager {
     }
 
     @Override
-    public synchronized void sendMessages(List bodies) throws IOException {
+    public  void sendMessages(List bodies) throws IOException {
         for(Object body : bodies){
             String uuid = UUID.randomUUID().toString();
-            queue.add(new QueueMessage(uuid,"handle_"+uuid,body,"putappriate type here"));
+            try {
+                queue.put(new QueueMessage(uuid, "handle_" + uuid, body, "put type here"));
+            }catch (InterruptedException ie){
+                throw new RuntimeException(ie);
+            }
         }
     }
 
@@ -72,14 +87,18 @@ public class DefaultQueueManager implements QueueManager {
     @Override
     public <T extends Serializable> void sendMessage( final T body ) throws IOException {
         String uuid = UUID.randomUUID().toString();
-        queue.add(new QueueMessage(uuid,"handle_"+uuid,body,"put type here"));
-
+        try {
+            queue.offer(new QueueMessage(uuid, "handle_" + uuid, body, "put type here"),5000,TimeUnit.MILLISECONDS);
+        }catch (InterruptedException ie){
+            throw new RuntimeException(ie);
+        }
     }
 
 
+
     @Override
     public <T extends Serializable> void sendMessageToTopic( final T body ) throws IOException {
-       sendMessage( body );
+        sendMessage( body );
     }