You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2015/06/19 13:31:07 UTC

svn commit: r1686390 - /qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java

Author: kwall
Date: Fri Jun 19 11:31:07 2015
New Revision: 1686390

URL: http://svn.apache.org/r1686390
Log:
QPID-6601: [Java Broker] Log CHN-1004 at most once per channel and CHN-1002 only if message polling is not used

Also reordered channel close messages so that channel close message follows the subscription close message(s).

Modified:
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1686390&r1=1686389&r2=1686390&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Fri Jun 19 11:31:07 2015
@@ -114,7 +114,6 @@ import org.apache.qpid.server.virtualhos
 import org.apache.qpid.server.virtualhost.RequiredExchangeException;
 import org.apache.qpid.server.virtualhost.ReservedExchangeNameException;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
-import org.apache.qpid.transport.TransportException;
 import org.apache.qpid.transport.network.Ticker;
 
 public class AMQChannel
@@ -217,6 +216,15 @@ public class AMQChannel
 
     private boolean _wireBlockingState;
 
+    /** Flag recording if this channel has already written operational logging for prefetch size */
+    private boolean _prefetchLoggedForChannel = false;
+
+    /**
+     * Handles special case where consumer is polling for messages using qos/flow. Avoids the per-message
+     * production of channel flow and prefetch operational logging.
+     */
+    private boolean _logChannelFlowMessages = true;
+
     public AMQChannel(AMQProtocolEngine connection, int channelId, final MessageStore messageStore)
     {
         _creditManager = new Pre0_10CreditManager(0l,0l, connection);
@@ -876,50 +884,47 @@ public class AMQChannel
             return;
         }
 
-        LogMessage operationalLogMessage = cause == null ?
-                ChannelMessages.CLOSE() :
-                ChannelMessages.CLOSE_FORCED(cause.getCode(), message);
-        getVirtualHost().getEventLogger().message(_logSubject, operationalLogMessage);
-
-        unsubscribeAllConsumers();
-        setDefaultQueue(null);
-        for (Action<? super AMQChannel> task : _taskList)
+        try
         {
-            task.performAction(this);
-        }
-
+            unsubscribeAllConsumers();
+            setDefaultQueue(null);
+            for (Action<? super AMQChannel> task : _taskList)
+            {
+                task.performAction(this);
+            }
 
-        _transaction.rollback();
+            _transaction.rollback();
 
-        try
-        {
             requeue();
         }
-        catch (TransportException e)
+        finally
         {
-            _logger.error("Caught TransportException whilst attempting to requeue:" + e);
+            LogMessage operationalLogMessage = cause == null ?
+                    ChannelMessages.CLOSE() :
+                    ChannelMessages.CLOSE_FORCED(cause.getCode(), message);
+            getVirtualHost().getEventLogger().message(_logSubject, operationalLogMessage);
         }
     }
 
     private void unsubscribeAllConsumers()
     {
-        if (_logger.isInfoEnabled())
+        if (_logger.isDebugEnabled())
         {
             if (!_tag2SubscriptionTargetMap.isEmpty())
             {
-                _logger.info("Unsubscribing all consumers on channel " + toString());
+                _logger.debug("Unsubscribing all consumers on channel " + toString());
             }
             else
             {
-                _logger.info("No consumers to unsubscribe on channel " + toString());
+                _logger.debug("No consumers to unsubscribe on channel " + toString());
             }
         }
 
         for (Map.Entry<AMQShortString, ConsumerTarget_0_8> me : _tag2SubscriptionTargetMap.entrySet())
         {
-            if (_logger.isInfoEnabled())
+            if (_logger.isDebugEnabled())
             {
-                _logger.info("Unsubscribing consumer '" + me.getKey() + "' on channel " + toString());
+                _logger.debug("Unsubscribing consumer '" + me.getKey() + "' on channel " + toString());
             }
 
             Collection<ConsumerImpl> subs = me.getValue().getConsumers();
@@ -976,9 +981,9 @@ public class AMQChannel
 
         if (!messagesToBeDelivered.isEmpty())
         {
-            if (_logger.isInfoEnabled())
+            if (_logger.isDebugEnabled())
             {
-                _logger.info("Requeuing " + messagesToBeDelivered.size() + " unacked messages. for " + toString());
+                _logger.debug("Requeuing " + messagesToBeDelivered.size() + " unacked messages. for " + toString());
             }
 
         }
@@ -1104,11 +1109,11 @@ public class AMQChannel
         } // for all messages
         // } else !isSuspend
 
-        if (_logger.isInfoEnabled())
+        if (_logger.isDebugEnabled())
         {
             if (!msgToRequeue.isEmpty())
             {
-                _logger.info("Preparing (" + msgToRequeue.size() + ") message to requeue to.");
+                _logger.debug("Preparing (" + msgToRequeue.size() + ") message to requeue to.");
             }
         }
 
@@ -1171,7 +1176,7 @@ public class AMQChannel
         if (wasSuspended != suspended)
         {
             // Log Flow Started before we start the subscriptions
-            if (!suspended)
+            if (!suspended && _logChannelFlowMessages)
             {
                 getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW("Started"));
             }
@@ -1225,7 +1230,7 @@ public class AMQChannel
 
             // Log Suspension only after we have confirmed all suspensions are
             // stopped.
-            if (suspended)
+            if (suspended && _logChannelFlowMessages)
             {
                 getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW("Stopped"));
             }
@@ -1355,14 +1360,18 @@ public class AMQChannel
         return _connection;
     }
 
-    public FlowCreditManager getCreditManager()
-    {
-        return _creditManager;
-    }
-
     public void setCredit(final long prefetchSize, final int prefetchCount)
     {
-        getVirtualHost().getEventLogger().message(ChannelMessages.PREFETCH_SIZE(prefetchSize, prefetchCount));
+        if (!_prefetchLoggedForChannel)
+        {
+            getVirtualHost().getEventLogger().message(ChannelMessages.PREFETCH_SIZE(prefetchSize, prefetchCount));
+            _prefetchLoggedForChannel = true;
+        }
+
+        if (prefetchCount <= 1 && prefetchSize == 0 )
+        {
+            _logChannelFlowMessages = false;
+        }
         _creditManager.setCreditLimits(prefetchSize, prefetchCount);
     }
 
@@ -2215,7 +2224,7 @@ public class AMQChannel
         MessageSource queue = queueName == null ? getDefaultQueue() : vHost.getMessageSource(queueName.toString());
         if (queue == null)
         {
-            _logger.info("No queue for '" + queueName + "'");
+            _logger.debug("No queue for '" + queueName + "'");
             if (queueName != null)
             {
                 _connection.closeConnection(AMQConstant.NOT_FOUND, "No such queue, '" + queueName + "'", _channelId);
@@ -3126,7 +3135,7 @@ public class AMQChannel
 
                     if (_logger.isInfoEnabled())
                     {
-                        _logger.info("Binding queue "
+                        _logger.debug("Binding queue "
                                      + queue
                                      + " to exchange "
                                      + exch
@@ -3221,7 +3230,7 @@ public class AMQChannel
                                                                         queue.getConsumerCount());
                         _connection.writeFrame(responseBody.generateFrame(getChannelId()));
 
-                        _logger.info("Queue " + queueName + " declared successfully");
+                        _logger.debug("Queue " + queueName + " declared successfully");
                     }
                 }
             }
@@ -3271,7 +3280,7 @@ public class AMQChannel
                                                                     queue.getConsumerCount());
                     _connection.writeFrame(responseBody.generateFrame(getChannelId()));
 
-                    _logger.info("Queue " + queueName + " declared successfully");
+                    _logger.debug("Queue " + queueName + " declared successfully");
                 }
             }
             catch (QueueExistsException qe)
@@ -3327,7 +3336,7 @@ public class AMQChannel
                                                                         queue.getConsumerCount());
                         _connection.writeFrame(responseBody.generateFrame(getChannelId()));
 
-                        _logger.info("Queue " + queueName + " declared successfully");
+                        _logger.debug("Queue " + queueName + " declared successfully");
                     }
                 }
             }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org