You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2014/04/16 15:44:13 UTC

git commit: https://issues.apache.org/jira/browse/AMQ-5149 - potential deadlock

Repository: activemq
Updated Branches:
  refs/heads/trunk 7646526c0 -> e94792751


https://issues.apache.org/jira/browse/AMQ-5149 - potential deadlock


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/e9479275
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/e9479275
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/e9479275

Branch: refs/heads/trunk
Commit: e947927511796484e2ae0a066df7fa7b4800a578
Parents: 7646526
Author: Dejan Bosanac <de...@nighttale.net>
Authored: Wed Apr 16 15:44:02 2014 +0200
Committer: Dejan Bosanac <de...@nighttale.net>
Committed: Wed Apr 16 15:44:02 2014 +0200

----------------------------------------------------------------------
 .../apache/activemq/broker/region/Queue.java    | 100 ++++++++-----------
 .../region/cursors/OrderedPendingList.java      |   9 ++
 .../broker/region/cursors/PendingList.java      |   3 +
 .../region/cursors/PrioritizedPendingList.java  |   9 ++
 .../region/cursors/OrderPendingListTest.java    |  10 ++
 5 files changed, 75 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/e9479275/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index b6af75c..2f3d8bd 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -109,7 +109,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
     private final ReentrantReadWriteLock messagesLock = new ReentrantReadWriteLock();
     protected PendingMessageCursor messages;
     private final ReentrantReadWriteLock pagedInMessagesLock = new ReentrantReadWriteLock();
-    private final LinkedHashMap<MessageId, QueueMessageReference> pagedInMessages = new LinkedHashMap<MessageId, QueueMessageReference>();
+    private final PendingList pagedInMessages = new OrderedPendingList();
     // Messages that are paged in but have not yet been targeted at a subscription
     private final ReentrantReadWriteLock pagedInPendingDispatchLock = new ReentrantReadWriteLock();
     protected PendingList pagedInPendingDispatch = new OrderedPendingList();
@@ -1188,44 +1188,8 @@ public class Queue extends BaseDestination implements Task, UsageListener {
                 pageInMessages(!memoryUsage.isFull(110));
             };
 
-            List<MessageReference> toExpire = new ArrayList<MessageReference>();
-
-            pagedInPendingDispatchLock.writeLock().lock();
-            try {
-                addAll(pagedInPendingDispatch.values(), browseList, max, toExpire);
-                for (MessageReference ref : toExpire) {
-                    pagedInPendingDispatch.remove(ref);
-                    if (broker.isExpired(ref)) {
-                        LOG.debug("expiring from pagedInPending: {}", ref);
-                        messageExpired(connectionContext, ref);
-                    } else {
-                        ref.decrementReferenceCount();
-                    }
-                }
-            } finally {
-                pagedInPendingDispatchLock.writeLock().unlock();
-            }
-            toExpire.clear();
-            pagedInMessagesLock.readLock().lock();
-            try {
-                addAll(pagedInMessages.values(), browseList, max, toExpire);
-            } finally {
-                pagedInMessagesLock.readLock().unlock();
-            }
-            for (MessageReference ref : toExpire) {
-                if (broker.isExpired(ref)) {
-                    LOG.debug("expiring from pagedInMessages: {}", ref);
-                    messageExpired(connectionContext, ref);
-                } else {
-                    pagedInMessagesLock.writeLock().lock();
-                    try {
-                        pagedInMessages.remove(ref.getMessageId());
-                    } finally {
-                        pagedInMessagesLock.writeLock().unlock();
-                    }
-                    ref.decrementReferenceCount();
-                }
-            }
+            doBrowseList(browseList, max, pagedInPendingDispatch, pagedInPendingDispatchLock, connectionContext, "pagedInPendingDispatch");
+            doBrowseList(browseList, max, pagedInMessages, pagedInMessagesLock, connectionContext, "pagedInMessages");
 
             // we need a store iterator to walk messages on disk, independent of the cursor which is tracking
             // the next message batch
@@ -1234,6 +1198,30 @@ public class Queue extends BaseDestination implements Task, UsageListener {
         }
     }
 
+    protected void doBrowseList(List<Message> browseList, int max, PendingList list, ReentrantReadWriteLock lock, ConnectionContext connectionContext, String name) throws Exception {
+        List<MessageReference> toExpire = new ArrayList<MessageReference>();
+        lock.readLock().lock();
+        try {
+            addAll(list.values(), browseList, max, toExpire);
+        } finally {
+            lock.readLock().unlock();
+        }
+        for (MessageReference ref : toExpire) {
+            if (broker.isExpired(ref)) {
+                LOG.debug("expiring from {}: {}", name, ref);
+                messageExpired(connectionContext, ref);
+            } else {
+                lock.writeLock().lock();
+                try {
+                    list.remove(ref);
+                } finally {
+                    lock.writeLock().unlock();
+                }
+                ref.decrementReferenceCount();
+            }
+        }
+    }
+
     private boolean shouldPageInMoreForBrowse(int max) {
         int alreadyPagedIn = 0;
         pagedInMessagesLock.readLock().lock();
@@ -1264,7 +1252,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
         MessageId msgId = new MessageId(id);
         pagedInMessagesLock.readLock().lock();
         try {
-            QueueMessageReference ref = this.pagedInMessages.get(msgId);
+            QueueMessageReference ref = (QueueMessageReference)this.pagedInMessages.get(msgId);
             if (ref != null) {
                 return ref;
             }
@@ -1535,7 +1523,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
     public int moveMatchingMessagesTo(ConnectionContext context, MessageReferenceFilter filter,
             ActiveMQDestination dest, int maximumMessages) throws Exception {
         int movedCounter = 0;
-        Set<QueueMessageReference> set = new LinkedHashSet<QueueMessageReference>();
+        Set<MessageReference> set = new LinkedHashSet<MessageReference>();
         do {
             doPageIn(true);
             pagedInMessagesLock.readLock().lock();
@@ -1544,11 +1532,11 @@ public class Queue extends BaseDestination implements Task, UsageListener {
             } finally {
                 pagedInMessagesLock.readLock().unlock();
             }
-            List<QueueMessageReference> list = new ArrayList<QueueMessageReference>(set);
-            for (QueueMessageReference ref : list) {
+            List<MessageReference> list = new ArrayList<MessageReference>(set);
+            for (MessageReference ref : list) {
                 if (filter.evaluate(context, ref)) {
                     // We should only move messages that can be locked.
-                    moveMessageTo(context, ref, dest);
+                    moveMessageTo(context, (QueueMessageReference)ref, dest);
                     set.remove(ref);
                     if (++movedCounter >= maximumMessages && maximumMessages > 0) {
                         return movedCounter;
@@ -1564,7 +1552,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
             throw new Exception("Retry of message is only possible on Dead Letter Queues!");
         }
         int restoredCounter = 0;
-        Set<QueueMessageReference> set = new LinkedHashSet<QueueMessageReference>();
+        Set<MessageReference> set = new LinkedHashSet<MessageReference>();
         do {
             doPageIn(true);
             pagedInMessagesLock.readLock().lock();
@@ -1573,11 +1561,11 @@ public class Queue extends BaseDestination implements Task, UsageListener {
             } finally {
                 pagedInMessagesLock.readLock().unlock();
             }
-            List<QueueMessageReference> list = new ArrayList<QueueMessageReference>(set);
-            for (QueueMessageReference ref : list) {
+            List<MessageReference> list = new ArrayList<MessageReference>(set);
+            for (MessageReference ref : list) {
                 if (ref.getMessage().getOriginalDestination() != null) {
 
-                    moveMessageTo(context, ref, ref.getMessage().getOriginalDestination());
+                    moveMessageTo(context, (QueueMessageReference)ref, ref.getMessage().getOriginalDestination());
                     set.remove(ref);
                     if (++restoredCounter >= maximumMessages && maximumMessages > 0) {
                         return restoredCounter;
@@ -1672,10 +1660,10 @@ public class Queue extends BaseDestination implements Task, UsageListener {
             }
 
             if (hasBrowsers) {
-                ArrayList<QueueMessageReference> alreadyDispatchedMessages = null;
+                ArrayList<MessageReference> alreadyDispatchedMessages = null;
                 pagedInMessagesLock.readLock().lock();
                 try{
-                    alreadyDispatchedMessages = new ArrayList<QueueMessageReference>(pagedInMessages.values());
+                    alreadyDispatchedMessages = new ArrayList<MessageReference>(pagedInMessages.values());
                 }finally {
                     pagedInMessagesLock.readLock().unlock();
                 }
@@ -1691,8 +1679,8 @@ public class Queue extends BaseDestination implements Task, UsageListener {
 
                         LOG.debug("dispatch to browser: {}, already dispatched/paged count: {}", browser, alreadyDispatchedMessages.size());
                         boolean added = false;
-                        for (QueueMessageReference node : alreadyDispatchedMessages) {
-                            if (!node.isAcked() && !browser.isDuplicate(node.getMessageId()) && !browser.atMax()) {
+                        for (MessageReference node : alreadyDispatchedMessages) {
+                            if (!((QueueMessageReference)node).isAcked() && !browser.isDuplicate(node.getMessageId()) && !browser.atMax()) {
                                 msgContext.setMessageReference(node);
                                 if (browser.matches(node, msgContext)) {
                                     browser.add(node);
@@ -1830,7 +1818,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
             destinationStatistics.getMessages().decrement();
             pagedInMessagesLock.writeLock().lock();
             try {
-                pagedInMessages.remove(reference.getMessageId());
+                pagedInMessages.remove(reference);
             } finally {
                 pagedInMessagesLock.writeLock().unlock();
             }
@@ -1996,8 +1984,8 @@ public class Queue extends BaseDestination implements Task, UsageListener {
                     resultList = new OrderedPendingList();
                 }
                 for (QueueMessageReference ref : result) {
-                    if (!pagedInMessages.containsKey(ref.getMessageId())) {
-                        pagedInMessages.put(ref.getMessageId(), ref);
+                    if (!pagedInMessages.contains(ref)) {
+                        pagedInMessages.addMessageLast(ref);
                         resultList.addMessageLast(ref);
                     } else {
                         ref.decrementReferenceCount();
@@ -2260,7 +2248,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
         if (message == null) {
             pagedInMessagesLock.readLock().lock();
             try {
-                message = pagedInMessages.get(messageId);
+                message = (QueueMessageReference)pagedInMessages.get(messageId);
             } finally {
                 pagedInMessagesLock.readLock().unlock();
             }

http://git-wip-us.apache.org/repos/asf/activemq/blob/e9479275/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java
index 33062e7..9bf9588 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java
@@ -164,4 +164,13 @@ public class OrderedPendingList implements PendingList {
             }
         }
     }
+
+    @Override
+    public MessageReference get(MessageId messageId) {
+        PendingNode node = map.get(messageId);
+        if (node != null) {
+            return node.getMessage();
+        }
+        return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/e9479275/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java
index a44d80e..153d8bd 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java
@@ -20,6 +20,7 @@ import java.util.Collection;
 import java.util.Iterator;
 
 import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.command.MessageId;
 
 public interface PendingList extends Iterable<MessageReference> {
 
@@ -108,4 +109,6 @@ public interface PendingList extends Iterable<MessageReference> {
      *      The PendingList that is to be added to this collection.
      */
     public void addAll(PendingList pendingList);
+
+    public MessageReference get(MessageId messageId);
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/e9479275/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java
index 0772b20..9235b2c 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java
@@ -156,4 +156,13 @@ public class PrioritizedPendingList implements PendingList {
         }
     }
 
+    @Override
+    public MessageReference get(MessageId messageId) {
+        PendingNode node = map.get(messageId);
+        if (node != null) {
+            return node.getMessage();
+        }
+        return null;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/e9479275/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/OrderPendingListTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/OrderPendingListTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/OrderPendingListTest.java
index 05308de..79d7e6c 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/OrderPendingListTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/OrderPendingListTest.java
@@ -329,6 +329,16 @@ public class OrderPendingListTest {
                 theList.add(messageReference);
             }
         }
+
+        @Override
+        public MessageReference get(MessageId messageId) {
+            for(MessageReference messageReference : theList) {
+                if (messageReference.getMessageId().equals(messageId)) {
+                    return messageReference;
+                }
+            }
+            return null;
+        }
     }
 
     static class TestMessageReference implements MessageReference {