You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/11/15 14:39:48 UTC
svn commit: r1769842 - in
/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8:
ConsumerTarget_0_8.java UnacknowledgedMessageMap.java
UnacknowledgedMessageMapImpl.java
Author: rgodfrey
Date: Tue Nov 15 14:39:48 2016
New Revision: 1769842
URL: http://svn.apache.org/viewvc?rev=1769842&view=rev
Log:
NO-JIRA : Remove unnecessary synchronization in UnacknowledgeMesasgeMapImpl and ConsumerTarget_0_8
Modified:
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java?rev=1769842&r1=1769841&r2=1769842&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java Tue Nov 15 14:39:48 2016
@@ -101,11 +101,8 @@ public abstract class ConsumerTarget_0_8
// We don't decrement the reference here as we don't want to consume the message
// but we do want to send it to the client.
- synchronized (getChannel())
- {
- long deliveryTag = getChannel().getNextDeliveryTag();
- sendToClient(consumer, entry.getMessage(), entry.getInstanceProperties(), deliveryTag);
- }
+ long deliveryTag = getChannel().getNextDeliveryTag();
+ sendToClient(consumer, entry.getMessage(), entry.getInstanceProperties(), deliveryTag);
}
@@ -162,15 +159,11 @@ public abstract class ConsumerTarget_0_8
MessageReference ref = message.newReference();
InstanceProperties props = entry.getInstanceProperties();
entry.delete();
- long size;
- synchronized (getChannel())
- {
- getChannel().getConnection().setDeferFlush(batch);
- long deliveryTag = getChannel().getNextDeliveryTag();
+ getChannel().getConnection().setDeferFlush(batch);
+ long deliveryTag = getChannel().getNextDeliveryTag();
- size = sendToClient(consumer, message, props, deliveryTag);
+ sendToClient(consumer, message, props, deliveryTag);
- }
ref.release();
}
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java?rev=1769842&r1=1769841&r2=1769842&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java Tue Nov 15 14:39:48 2016
@@ -54,13 +54,6 @@ public interface UnacknowledgedMessageMa
MessageInstance get(long deliveryTag);
- /**
- * Get the set of delivery tags that are outstanding.
- *
- * @return a set of delivery tags
- */
- Set<Long> getDeliveryTags();
-
Collection<MessageInstance> acknowledge(long deliveryTag, boolean multiple);
void collect(long key, boolean multiple, Map<Long, MessageInstance> msgs);
}
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java?rev=1769842&r1=1769841&r2=1769842&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java Tue Nov 15 14:39:48 2016
@@ -26,19 +26,17 @@ import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import org.apache.qpid.server.message.MessageInstance;
-public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
+class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
{
- private final Object _lock = new Object();
-
private Map<Long, MessageInstance> _map;
+ private volatile int _size;
private final int _prefetchLimit;
- public UnacknowledgedMessageMapImpl(int prefetchLimit)
+ UnacknowledgedMessageMapImpl(int prefetchLimit)
{
_prefetchLimit = prefetchLimit;
_map = new LinkedHashMap<>(prefetchLimit);
@@ -63,93 +61,68 @@ public class UnacknowledgedMessageMapImp
public void remove(Map<Long,MessageInstance> msgs)
{
- synchronized (_lock)
+ for (Long deliveryTag : msgs.keySet())
{
- for (Long deliveryTag : msgs.keySet())
- {
- remove(deliveryTag);
- }
+ remove(deliveryTag);
}
}
public MessageInstance remove(long deliveryTag)
{
- synchronized (_lock)
+ MessageInstance message = _map.remove(deliveryTag);
+ if(message != null)
{
-
- MessageInstance message = _map.remove(deliveryTag);
- return message;
+ _size--;
}
+ return message;
}
public void visit(Visitor visitor)
{
- synchronized (_lock)
+ for (Map.Entry<Long, MessageInstance> entry : _map.entrySet())
{
- Set<Map.Entry<Long, MessageInstance>> currentEntries = _map.entrySet();
- for (Map.Entry<Long, MessageInstance> entry : currentEntries)
- {
- visitor.callback(entry.getKey().longValue(), entry.getValue());
- }
- visitor.visitComplete();
+ visitor.callback(entry.getKey(), entry.getValue());
}
+ visitor.visitComplete();
}
public void add(long deliveryTag, MessageInstance message)
{
- synchronized (_lock)
+ if(_map.put(deliveryTag, message) == null)
{
- _map.put(deliveryTag, message);
+ _size++;
}
}
public Collection<MessageInstance> cancelAllMessages()
{
- synchronized (_lock)
- {
- Collection<MessageInstance> currentEntries = _map.values();
- _map = new LinkedHashMap<>(_prefetchLimit);
- return currentEntries;
- }
+ Collection<MessageInstance> currentEntries = _map.values();
+ _map = new LinkedHashMap<>(_prefetchLimit);
+ _size = 0;
+ return currentEntries;
}
public int size()
{
- synchronized (_lock)
- {
- return _map.size();
- }
+ return _size;
}
public void clear()
{
- synchronized (_lock)
- {
- _map.clear();
- }
+ _map.clear();
+ _size = 0;
}
public MessageInstance get(long key)
{
- synchronized (_lock)
- {
- return _map.get(key);
- }
- }
-
- public Set<Long> getDeliveryTags()
- {
- synchronized (_lock)
- {
- return _map.keySet();
- }
+ return _map.get(key);
}
public Collection<MessageInstance> acknowledge(long deliveryTag, boolean multiple)
{
if(multiple)
{
- Map<Long, MessageInstance> ackedMessageMap = new LinkedHashMap<Long, MessageInstance>();
+ Map<Long, MessageInstance> ackedMessageMap = new LinkedHashMap<>();
collect(deliveryTag, multiple, ackedMessageMap);
remove(ackedMessageMap);
List<MessageInstance> acknowledged = new ArrayList<>();
@@ -165,10 +138,7 @@ public class UnacknowledgedMessageMapImp
else
{
MessageInstance instance;
- synchronized (_lock)
- {
- instance = remove(deliveryTag);
- }
+ instance = remove(deliveryTag);
if(instance != null && instance.makeAcquisitionUnstealable(instance.getAcquiringConsumer()))
{
return Collections.singleton(instance);
@@ -183,15 +153,12 @@ public class UnacknowledgedMessageMapImp
private void collect(long key, Map<Long, MessageInstance> msgs)
{
- synchronized (_lock)
+ for (Map.Entry<Long, MessageInstance> entry : _map.entrySet())
{
- for (Map.Entry<Long, MessageInstance> entry : _map.entrySet())
+ msgs.put(entry.getKey(), entry.getValue());
+ if (entry.getKey() == key)
{
- msgs.put(entry.getKey(),entry.getValue());
- if (entry.getKey() == key)
- {
- break;
- }
+ break;
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org