You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/11/15 12:07:41 UTC

svn commit: r1769795 - in /qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue: AbstractQueue.java QueueConsumerManagerImpl.java

Author: rgodfrey
Date: Tue Nov 15 12:07:41 2016
New Revision: 1769795

URL: http://svn.apache.org/viewvc?rev=1769795&view=rev
Log:
Iterators over the interested list may throw a NoSuchElementException even when hasNext() had previously been called

Modified:
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1769795&r1=1769794&r2=1769795&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Tue Nov 15 12:07:41 2016
@@ -58,7 +58,6 @@ import java.util.zip.GZIPOutputStream;
 
 import javax.security.auth.Subject;
 
-import com.google.common.collect.Lists;
 import com.google.common.io.ByteStreams;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
@@ -285,7 +284,7 @@ public abstract class AbstractQueue<X ex
                 while (consumerIterator.hasNext())
                 {
                     QueueConsumer<?> queueConsumer = consumerIterator.next();
-                    if (queueConsumer.getPriority() < highestNotifiedPriority || notifyConsumer(queueConsumer))
+                    if (queueConsumer != null && queueConsumer.getPriority() < highestNotifiedPriority || notifyConsumer(queueConsumer))
                     {
                         break;
                     }
@@ -1043,7 +1042,7 @@ public abstract class AbstractQueue<X ex
     @Override
     public Collection<QueueConsumer<?>> getConsumers()
     {
-        return Lists.newArrayList(_queueConsumerManager.getAllIterator());
+        return getQueueConsumersAsList();
     }
 
 
@@ -1324,7 +1323,7 @@ public abstract class AbstractQueue<X ex
             QueueConsumer<?> sub = consumerIterator.next();
 
             // we don't make browsers send the same stuff twice
-            if (sub.seesRequeues())
+            if (sub != null && sub.seesRequeues())
             {
                 updateSubRequeueEntry(sub, entry);
             }
@@ -1830,7 +1829,7 @@ public abstract class AbstractQueue<X ex
         while (nonAcquiringIterator.hasNext())
         {
             QueueConsumer<?> consumer = nonAcquiringIterator.next();
-            if(consumer.hasInterest(entry))
+            if(consumer != null && consumer.hasInterest(entry))
             {
                 notifyConsumer(consumer);
             }
@@ -1840,7 +1839,7 @@ public abstract class AbstractQueue<X ex
         while (entry.isAvailable() && interestedIterator.hasNext())
         {
             QueueConsumer<?> consumer = interestedIterator.next();
-            if(consumer.hasInterest(entry))
+            if(consumer != null && consumer.hasInterest(entry))
             {
                 if(notifyConsumer(consumer))
                 {
@@ -1862,12 +1861,14 @@ public abstract class AbstractQueue<X ex
         while (hasAvailableMessages() && interestedIterator.hasNext())
         {
             QueueConsumer<?> consumer = interestedIterator.next();
-
-            if (excludedConsumer != consumer)
+            if(consumer != null)
             {
-                if (notifyConsumer(consumer))
+                if (excludedConsumer != consumer)
                 {
-                    break;
+                    if (notifyConsumer(consumer))
+                    {
+                        break;
+                    }
                 }
             }
         }
@@ -2014,7 +2015,7 @@ public abstract class AbstractQueue<X ex
         while (consumerIterator.hasNext())
         {
             QueueConsumer<?> consumer = consumerIterator.next();
-            if(consumer.getPriority() > sub.getPriority())
+            if(consumer != null && consumer.getPriority() > sub.getPriority())
             {
                 if(consumer.isNotifyWorkDesired()
                    && consumer.acquires()
@@ -2958,11 +2959,26 @@ public abstract class AbstractQueue<X ex
         {
             return _queueConsumerManager == null
                     ? Collections.<C>emptySet()
-                    : (Collection<C>) Lists.newArrayList(_queueConsumerManager.getAllIterator());
+                    : (Collection<C>) getQueueConsumersAsList();
         }
         else return Collections.emptySet();
     }
 
+    private List<QueueConsumer<?>> getQueueConsumersAsList()
+    {
+        List<QueueConsumer<?>> consumers = new ArrayList<>(_queueConsumerManager.getAllSize());
+        final Iterator<QueueConsumer<?>> iter = _queueConsumerManager.getAllIterator();
+        while(iter.hasNext())
+        {
+            final QueueConsumer<?> consumer = iter.next();
+            if(consumer != null)
+            {
+                consumers.add(consumer);
+            }
+        }
+        return consumers;
+    }
+
     @Override
     protected <C extends ConfiguredObject> ListenableFuture<C> addChildAsync(final Class<C> childClass,
                                                       final Map<String, Object> attributes,
@@ -3425,7 +3441,7 @@ public abstract class AbstractQueue<X ex
             while (consumerIterator.hasNext() && !isDeleted())
             {
                 QueueConsumer<?> sub = consumerIterator.next();
-                if(sub.acquires())
+                if(sub != null && sub.acquires())
                 {
                     getNextAvailableEntry(sub);
                 }

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java?rev=1769795&r1=1769794&r2=1769795&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java Tue Nov 15 12:07:41 2016
@@ -23,7 +23,6 @@ package org.apache.qpid.server.queue;
 import java.util.EnumSet;
 import java.util.Iterator;
 import java.util.List;
-import java.util.NoSuchElementException;
 import java.util.concurrent.CopyOnWriteArrayList;
 
 public class QueueConsumerManagerImpl implements QueueConsumerManager
@@ -379,7 +378,9 @@ public class QueueConsumerManagerImpl im
             }
             else
             {
-                throw new NoSuchElementException();
+                // throwing exceptions is expensive, and due to concurrency a caller might get here even though they
+                // had previously checked with hasNext()
+                return null;
             }
         }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org