You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/11/24 14:08:57 UTC
svn commit: r1641389 -
/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
Author: rgodfrey
Date: Mon Nov 24 13:08:57 2014
New Revision: 1641389
URL: http://svn.apache.org/r1641389
Log:
QPID-6245 : [Java Broker] release reference to IncomingMessage once AMQMessage has been created
Modified:
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1641389&r1=1641388&r2=1641389&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Mon Nov 24 13:08:57 2014
@@ -409,16 +409,24 @@ public class AMQChannel
_confirmedMessageCounter++;
}
Runnable finallyAction = null;
+ ContentHeaderBody contentHeader = _currentMessage.getContentHeader();
+
+ long bodySize = _currentMessage.getSize();
+ long timestamp = contentHeader.getProperties().getTimestamp();
+
try
{
+ final MessagePublishInfo messagePublishInfo = _currentMessage.getMessagePublishInfo();
+ final MessageDestination destination = _currentMessage.getDestination();
+
final MessageMetaData messageMetaData =
- new MessageMetaData(_currentMessage.getMessagePublishInfo(),
- _currentMessage.getContentHeader(),
+ new MessageMetaData(messagePublishInfo,
+ contentHeader,
getConnection().getLastReceivedTime());
final StoredMessage<MessageMetaData> handle = _messageStore.addMessage(messageMetaData);
- final AMQMessage amqMessage = createAMQMessage(_currentMessage, handle);
+ final AMQMessage amqMessage = createAMQMessage(handle);
MessageReference reference = amqMessage.newReference();
try
{
@@ -434,7 +442,9 @@ public class AMQChannel
}
}
- if(!checkMessageUserId(_currentMessage.getContentHeader()))
+ _currentMessage = null;
+
+ if(!checkMessageUserId(contentHeader))
{
if(_confirmOnPublish)
{
@@ -444,7 +454,7 @@ public class AMQChannel
}
else
{
- final boolean immediate = _currentMessage.getMessagePublishInfo().isImmediate();
+ final boolean immediate = messagePublishInfo.isImmediate();
final InstanceProperties instanceProperties =
new InstanceProperties()
@@ -461,7 +471,7 @@ public class AMQChannel
case PERSISTENT:
return amqMessage.isPersistent();
case MANDATORY:
- return _currentMessage.getMessagePublishInfo().isMandatory();
+ return messagePublishInfo.isMandatory();
case REDELIVERED:
return false;
}
@@ -469,11 +479,11 @@ public class AMQChannel
}
};
- int enqueues = _currentMessage.getDestination().send(amqMessage,
- amqMessage.getInitialRoutingAddress(),
- instanceProperties, _transaction,
- immediate ? _immediateAction : _capacityCheckAction
- );
+ int enqueues = destination.send(amqMessage,
+ amqMessage.getInitialRoutingAddress(),
+ instanceProperties, _transaction,
+ immediate ? _immediateAction : _capacityCheckAction
+ );
if(enqueues == 0)
{
finallyAction = handleUnroutableMessage(amqMessage);
@@ -503,8 +513,6 @@ public class AMQChannel
}
finally
{
- long bodySize = _currentMessage.getSize();
- long timestamp = _currentMessage.getContentHeader().getProperties().getTimestamp();
_connection.registerMessageReceived(bodySize, timestamp);
_currentMessage = null;
}
@@ -553,7 +561,17 @@ public class AMQChannel
private Runnable handleUnroutableMessage(AMQMessage message)
{
boolean mandatory = message.isMandatory();
- String description = currentMessageDescription();
+
+ String exchangeName = message.getMessagePublishInfo().getExchange() == null
+ ? null : message.getMessagePublishInfo().getExchange().asString();
+ String routingKey = message.getMessagePublishInfo().getRoutingKey() == null
+ ? null : message.getMessagePublishInfo().getRoutingKey().asString();
+
+ final String description = String.format(
+ "[Exchange: %s, Routing key: %s]",
+ exchangeName,
+ routingKey);
+
boolean closeOnNoRoute = _connection.isCloseWhenNoRoute();
Runnable returnVal = null;
if(_logger.isDebugEnabled())
@@ -571,7 +589,7 @@ public class AMQChannel
public void run()
{
_connection.closeConnection(AMQConstant.NO_ROUTE,
- "No route for message " + currentMessageDescription(), _channelId);
+ "No route for message " + description, _channelId);
}
};
@@ -586,37 +604,18 @@ public class AMQChannel
}
_transaction.addPostTransactionAction(new WriteReturnAction(AMQConstant.NO_ROUTE,
"No Route for message "
- + currentMessageDescription(),
+ + description,
message));
}
else
{
- AMQShortString exchangeName = _currentMessage.getExchangeName();
- AMQShortString routingKey = _currentMessage.getMessagePublishInfo().getRoutingKey();
- getVirtualHost().getEventLogger().message(
- ExchangeMessages.DISCARDMSG(exchangeName == null ? null : exchangeName.asString(),
- routingKey == null ? null : routingKey.asString()));
+ getVirtualHost().getEventLogger().message(ExchangeMessages.DISCARDMSG(exchangeName, routingKey));
}
}
return returnVal;
}
- private String currentMessageDescription()
- {
- if(_currentMessage == null || !_currentMessage.allContentReceived())
- {
- throw new IllegalStateException("Cannot create message description for message: " + _currentMessage);
- }
-
- return String.format(
- "[Exchange: %s, Routing key: %s]",
- _currentMessage.getExchangeName(),
- _currentMessage.getMessagePublishInfo().getRoutingKey() == null
- ? null
- : _currentMessage.getMessagePublishInfo().getRoutingKey().toString());
- }
-
public void publishContentBody(ContentBody contentBody)
{
if (_logger.isDebugEnabled())
@@ -1347,7 +1346,7 @@ public class AMQChannel
}
- private AMQMessage createAMQMessage(IncomingMessage incomingMessage, StoredMessage<MessageMetaData> handle)
+ private AMQMessage createAMQMessage(StoredMessage<MessageMetaData> handle)
{
AMQMessage message = new AMQMessage(handle, _connection.getReference());
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org