You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/11/15 14:39:48 UTC

svn commit: r1769842 - in /qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8: ConsumerTarget_0_8.java UnacknowledgedMessageMap.java UnacknowledgedMessageMapImpl.java

Author: rgodfrey
Date: Tue Nov 15 14:39:48 2016
New Revision: 1769842

URL: http://svn.apache.org/viewvc?rev=1769842&view=rev
Log:
NO-JIRA : Remove unnecessary synchronization in UnacknowledgeMesasgeMapImpl and ConsumerTarget_0_8

Modified:
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java?rev=1769842&r1=1769841&r2=1769842&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java Tue Nov 15 14:39:48 2016
@@ -101,11 +101,8 @@ public abstract class ConsumerTarget_0_8
             // We don't decrement the reference here as we don't want to consume the message
             // but we do want to send it to the client.
 
-            synchronized (getChannel())
-            {
-                long deliveryTag = getChannel().getNextDeliveryTag();
-                sendToClient(consumer, entry.getMessage(), entry.getInstanceProperties(), deliveryTag);
-            }
+            long deliveryTag = getChannel().getNextDeliveryTag();
+            sendToClient(consumer, entry.getMessage(), entry.getInstanceProperties(), deliveryTag);
 
         }
 
@@ -162,15 +159,11 @@ public abstract class ConsumerTarget_0_8
             MessageReference ref = message.newReference();
             InstanceProperties props = entry.getInstanceProperties();
             entry.delete();
-            long size;
-            synchronized (getChannel())
-            {
-                getChannel().getConnection().setDeferFlush(batch);
-                long deliveryTag = getChannel().getNextDeliveryTag();
+            getChannel().getConnection().setDeferFlush(batch);
+            long deliveryTag = getChannel().getNextDeliveryTag();
 
-                size = sendToClient(consumer, message, props, deliveryTag);
+            sendToClient(consumer, message, props, deliveryTag);
 
-            }
             ref.release();
 
         }

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java?rev=1769842&r1=1769841&r2=1769842&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java Tue Nov 15 14:39:48 2016
@@ -54,13 +54,6 @@ public interface UnacknowledgedMessageMa
 
     MessageInstance get(long deliveryTag);
 
-    /**
-     * Get the set of delivery tags that are outstanding.
-     *
-     * @return a set of delivery tags
-     */
-    Set<Long> getDeliveryTags();
-
     Collection<MessageInstance> acknowledge(long deliveryTag, boolean multiple);
     void collect(long key, boolean multiple, Map<Long, MessageInstance> msgs);
 }

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java?rev=1769842&r1=1769841&r2=1769842&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java Tue Nov 15 14:39:48 2016
@@ -26,19 +26,17 @@ import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import org.apache.qpid.server.message.MessageInstance;
 
-public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
+class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
 {
-    private final Object _lock = new Object();
-
     private Map<Long, MessageInstance> _map;
+    private volatile int _size;
 
     private final int _prefetchLimit;
 
-    public UnacknowledgedMessageMapImpl(int prefetchLimit)
+    UnacknowledgedMessageMapImpl(int prefetchLimit)
     {
         _prefetchLimit = prefetchLimit;
         _map = new LinkedHashMap<>(prefetchLimit);
@@ -63,93 +61,68 @@ public class UnacknowledgedMessageMapImp
 
     public void remove(Map<Long,MessageInstance> msgs)
     {
-        synchronized (_lock)
+        for (Long deliveryTag : msgs.keySet())
         {
-            for (Long deliveryTag : msgs.keySet())
-            {
-                remove(deliveryTag);
-            }
+            remove(deliveryTag);
         }
     }
 
     public MessageInstance remove(long deliveryTag)
     {
-        synchronized (_lock)
+        MessageInstance message = _map.remove(deliveryTag);
+        if(message != null)
         {
-
-            MessageInstance message = _map.remove(deliveryTag);
-            return message;
+            _size--;
         }
+        return message;
     }
 
     public void visit(Visitor visitor)
     {
-        synchronized (_lock)
+        for (Map.Entry<Long, MessageInstance> entry : _map.entrySet())
         {
-            Set<Map.Entry<Long, MessageInstance>> currentEntries = _map.entrySet();
-            for (Map.Entry<Long, MessageInstance> entry : currentEntries)
-            {
-                visitor.callback(entry.getKey().longValue(), entry.getValue());
-            }
-            visitor.visitComplete();
+            visitor.callback(entry.getKey(), entry.getValue());
         }
+        visitor.visitComplete();
     }
 
     public void add(long deliveryTag, MessageInstance message)
     {
-        synchronized (_lock)
+        if(_map.put(deliveryTag, message) == null)
         {
-            _map.put(deliveryTag, message);
+            _size++;
         }
     }
 
     public Collection<MessageInstance> cancelAllMessages()
     {
-        synchronized (_lock)
-        {
-            Collection<MessageInstance> currentEntries = _map.values();
-            _map = new LinkedHashMap<>(_prefetchLimit);
-            return currentEntries;
-        }
+        Collection<MessageInstance> currentEntries = _map.values();
+        _map = new LinkedHashMap<>(_prefetchLimit);
+        _size = 0;
+        return currentEntries;
     }
 
     public int size()
     {
-        synchronized (_lock)
-        {
-            return _map.size();
-        }
+        return _size;
     }
 
     public void clear()
     {
-        synchronized (_lock)
-        {
-            _map.clear();
-        }
+        _map.clear();
+        _size = 0;
     }
 
     public MessageInstance get(long key)
     {
-        synchronized (_lock)
-        {
-            return _map.get(key);
-        }
-    }
-
-    public Set<Long> getDeliveryTags()
-    {
-        synchronized (_lock)
-        {
-            return _map.keySet();
-        }
+        return _map.get(key);
     }
 
     public Collection<MessageInstance> acknowledge(long deliveryTag, boolean multiple)
     {
         if(multiple)
         {
-            Map<Long, MessageInstance> ackedMessageMap = new LinkedHashMap<Long, MessageInstance>();
+            Map<Long, MessageInstance> ackedMessageMap = new LinkedHashMap<>();
             collect(deliveryTag, multiple, ackedMessageMap);
             remove(ackedMessageMap);
             List<MessageInstance> acknowledged = new ArrayList<>();
@@ -165,10 +138,7 @@ public class UnacknowledgedMessageMapImp
         else
         {
             MessageInstance instance;
-            synchronized (_lock)
-            {
-                instance = remove(deliveryTag);
-            }
+            instance = remove(deliveryTag);
             if(instance != null && instance.makeAcquisitionUnstealable(instance.getAcquiringConsumer()))
             {
                 return Collections.singleton(instance);
@@ -183,15 +153,12 @@ public class UnacknowledgedMessageMapImp
 
     private void collect(long key, Map<Long, MessageInstance> msgs)
     {
-        synchronized (_lock)
+        for (Map.Entry<Long, MessageInstance> entry : _map.entrySet())
         {
-            for (Map.Entry<Long, MessageInstance> entry : _map.entrySet())
+            msgs.put(entry.getKey(), entry.getValue());
+            if (entry.getKey() == key)
             {
-                msgs.put(entry.getKey(),entry.getValue());
-                if (entry.getKey() == key)
-                {
-                    break;
-                }
+                break;
             }
         }
     }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org