You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/11/24 14:08:57 UTC

svn commit: r1641389 - /qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java

Author: rgodfrey
Date: Mon Nov 24 13:08:57 2014
New Revision: 1641389

URL: http://svn.apache.org/r1641389
Log:
QPID-6245 : [Java Broker] release reference to IncomingMessage once AMQMessage has been created

Modified:
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1641389&r1=1641388&r2=1641389&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Mon Nov 24 13:08:57 2014
@@ -409,16 +409,24 @@ public class AMQChannel
                 _confirmedMessageCounter++;
             }
             Runnable finallyAction = null;
+            ContentHeaderBody contentHeader = _currentMessage.getContentHeader();
+
+            long bodySize = _currentMessage.getSize();
+            long timestamp = contentHeader.getProperties().getTimestamp();
+
             try
             {
 
+                final MessagePublishInfo messagePublishInfo = _currentMessage.getMessagePublishInfo();
+                final MessageDestination destination = _currentMessage.getDestination();
+
                 final MessageMetaData messageMetaData =
-                        new MessageMetaData(_currentMessage.getMessagePublishInfo(),
-                                            _currentMessage.getContentHeader(),
+                        new MessageMetaData(messagePublishInfo,
+                                            contentHeader,
                                             getConnection().getLastReceivedTime());
 
                 final StoredMessage<MessageMetaData> handle = _messageStore.addMessage(messageMetaData);
-                final AMQMessage amqMessage = createAMQMessage(_currentMessage, handle);
+                final AMQMessage amqMessage = createAMQMessage(handle);
                 MessageReference reference = amqMessage.newReference();
                 try
                 {
@@ -434,7 +442,9 @@ public class AMQChannel
                         }
                     }
 
-                    if(!checkMessageUserId(_currentMessage.getContentHeader()))
+                    _currentMessage = null;
+
+                    if(!checkMessageUserId(contentHeader))
                     {
                         if(_confirmOnPublish)
                         {
@@ -444,7 +454,7 @@ public class AMQChannel
                     }
                     else
                     {
-                        final boolean immediate = _currentMessage.getMessagePublishInfo().isImmediate();
+                        final boolean immediate = messagePublishInfo.isImmediate();
 
                         final InstanceProperties instanceProperties =
                                 new InstanceProperties()
@@ -461,7 +471,7 @@ public class AMQChannel
                                             case PERSISTENT:
                                                 return amqMessage.isPersistent();
                                             case MANDATORY:
-                                                return _currentMessage.getMessagePublishInfo().isMandatory();
+                                                return messagePublishInfo.isMandatory();
                                             case REDELIVERED:
                                                 return false;
                                         }
@@ -469,11 +479,11 @@ public class AMQChannel
                                     }
                                 };
 
-                        int enqueues = _currentMessage.getDestination().send(amqMessage,
-                                                                             amqMessage.getInitialRoutingAddress(),
-                                                                             instanceProperties, _transaction,
-                                                                          immediate ? _immediateAction : _capacityCheckAction
-                                                                            );
+                        int enqueues = destination.send(amqMessage,
+                                                        amqMessage.getInitialRoutingAddress(),
+                                                        instanceProperties, _transaction,
+                                                        immediate ? _immediateAction : _capacityCheckAction
+                                                       );
                         if(enqueues == 0)
                         {
                             finallyAction = handleUnroutableMessage(amqMessage);
@@ -503,8 +513,6 @@ public class AMQChannel
             }
             finally
             {
-                long bodySize = _currentMessage.getSize();
-                long timestamp = _currentMessage.getContentHeader().getProperties().getTimestamp();
                 _connection.registerMessageReceived(bodySize, timestamp);
                 _currentMessage = null;
             }
@@ -553,7 +561,17 @@ public class AMQChannel
     private Runnable handleUnroutableMessage(AMQMessage message)
     {
         boolean mandatory = message.isMandatory();
-        String description = currentMessageDescription();
+
+        String exchangeName = message.getMessagePublishInfo().getExchange() == null
+                ? null : message.getMessagePublishInfo().getExchange().asString();
+        String routingKey = message.getMessagePublishInfo().getRoutingKey() == null
+                ? null : message.getMessagePublishInfo().getRoutingKey().asString();
+
+        final String description = String.format(
+                "[Exchange: %s, Routing key: %s]",
+                exchangeName,
+                routingKey);
+
         boolean closeOnNoRoute = _connection.isCloseWhenNoRoute();
         Runnable returnVal = null;
         if(_logger.isDebugEnabled())
@@ -571,7 +589,7 @@ public class AMQChannel
                             public void run()
                             {
                                 _connection.closeConnection(AMQConstant.NO_ROUTE,
-                                        "No route for message " + currentMessageDescription(), _channelId);
+                                        "No route for message " + description, _channelId);
 
                             }
                         };
@@ -586,37 +604,18 @@ public class AMQChannel
                 }
                 _transaction.addPostTransactionAction(new WriteReturnAction(AMQConstant.NO_ROUTE,
                                                                             "No Route for message "
-                                                                            + currentMessageDescription(),
+                                                                            + description,
                                                                             message));
             }
             else
             {
-                AMQShortString exchangeName = _currentMessage.getExchangeName();
-                AMQShortString routingKey = _currentMessage.getMessagePublishInfo().getRoutingKey();
 
-                getVirtualHost().getEventLogger().message(
-                        ExchangeMessages.DISCARDMSG(exchangeName == null ? null : exchangeName.asString(),
-                                                    routingKey == null ? null : routingKey.asString()));
+                getVirtualHost().getEventLogger().message(ExchangeMessages.DISCARDMSG(exchangeName, routingKey));
             }
         }
         return returnVal;
     }
 
-    private String currentMessageDescription()
-    {
-        if(_currentMessage == null || !_currentMessage.allContentReceived())
-        {
-            throw new IllegalStateException("Cannot create message description for message: " + _currentMessage);
-        }
-
-        return String.format(
-                "[Exchange: %s, Routing key: %s]",
-                _currentMessage.getExchangeName(),
-                _currentMessage.getMessagePublishInfo().getRoutingKey() == null
-                        ? null
-                        : _currentMessage.getMessagePublishInfo().getRoutingKey().toString());
-    }
-
     public void publishContentBody(ContentBody contentBody)
     {
         if (_logger.isDebugEnabled())
@@ -1347,7 +1346,7 @@ public class AMQChannel
     }
 
 
-    private AMQMessage createAMQMessage(IncomingMessage incomingMessage, StoredMessage<MessageMetaData> handle)
+    private AMQMessage createAMQMessage(StoredMessage<MessageMetaData> handle)
     {
 
         AMQMessage message = new AMQMessage(handle, _connection.getReference());



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org