You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2015/06/19 13:31:07 UTC
svn commit: r1686390 -
/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
Author: kwall
Date: Fri Jun 19 11:31:07 2015
New Revision: 1686390
URL: http://svn.apache.org/r1686390
Log:
QPID-6601: [Java Broker] Log CHN-1004 at most once per channel and CHN-1002 only if message polling is not used
Also reordered channel close messages so that channel close message follows the subscription close message(s).
Modified:
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1686390&r1=1686389&r2=1686390&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Fri Jun 19 11:31:07 2015
@@ -114,7 +114,6 @@ import org.apache.qpid.server.virtualhos
import org.apache.qpid.server.virtualhost.RequiredExchangeException;
import org.apache.qpid.server.virtualhost.ReservedExchangeNameException;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
-import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.Ticker;
public class AMQChannel
@@ -217,6 +216,15 @@ public class AMQChannel
private boolean _wireBlockingState;
+ /** Flag recording if this channel has already written operational logging for prefetch size */
+ private boolean _prefetchLoggedForChannel = false;
+
+ /**
+ * Handles special case where consumer is polling for messages using qos/flow. Avoids the per-message
+ * production of channel flow and prefetch operational logging.
+ */
+ private boolean _logChannelFlowMessages = true;
+
public AMQChannel(AMQProtocolEngine connection, int channelId, final MessageStore messageStore)
{
_creditManager = new Pre0_10CreditManager(0l,0l, connection);
@@ -876,50 +884,47 @@ public class AMQChannel
return;
}
- LogMessage operationalLogMessage = cause == null ?
- ChannelMessages.CLOSE() :
- ChannelMessages.CLOSE_FORCED(cause.getCode(), message);
- getVirtualHost().getEventLogger().message(_logSubject, operationalLogMessage);
-
- unsubscribeAllConsumers();
- setDefaultQueue(null);
- for (Action<? super AMQChannel> task : _taskList)
+ try
{
- task.performAction(this);
- }
-
+ unsubscribeAllConsumers();
+ setDefaultQueue(null);
+ for (Action<? super AMQChannel> task : _taskList)
+ {
+ task.performAction(this);
+ }
- _transaction.rollback();
+ _transaction.rollback();
- try
- {
requeue();
}
- catch (TransportException e)
+ finally
{
- _logger.error("Caught TransportException whilst attempting to requeue:" + e);
+ LogMessage operationalLogMessage = cause == null ?
+ ChannelMessages.CLOSE() :
+ ChannelMessages.CLOSE_FORCED(cause.getCode(), message);
+ getVirtualHost().getEventLogger().message(_logSubject, operationalLogMessage);
}
}
private void unsubscribeAllConsumers()
{
- if (_logger.isInfoEnabled())
+ if (_logger.isDebugEnabled())
{
if (!_tag2SubscriptionTargetMap.isEmpty())
{
- _logger.info("Unsubscribing all consumers on channel " + toString());
+ _logger.debug("Unsubscribing all consumers on channel " + toString());
}
else
{
- _logger.info("No consumers to unsubscribe on channel " + toString());
+ _logger.debug("No consumers to unsubscribe on channel " + toString());
}
}
for (Map.Entry<AMQShortString, ConsumerTarget_0_8> me : _tag2SubscriptionTargetMap.entrySet())
{
- if (_logger.isInfoEnabled())
+ if (_logger.isDebugEnabled())
{
- _logger.info("Unsubscribing consumer '" + me.getKey() + "' on channel " + toString());
+ _logger.debug("Unsubscribing consumer '" + me.getKey() + "' on channel " + toString());
}
Collection<ConsumerImpl> subs = me.getValue().getConsumers();
@@ -976,9 +981,9 @@ public class AMQChannel
if (!messagesToBeDelivered.isEmpty())
{
- if (_logger.isInfoEnabled())
+ if (_logger.isDebugEnabled())
{
- _logger.info("Requeuing " + messagesToBeDelivered.size() + " unacked messages. for " + toString());
+ _logger.debug("Requeuing " + messagesToBeDelivered.size() + " unacked messages. for " + toString());
}
}
@@ -1104,11 +1109,11 @@ public class AMQChannel
} // for all messages
// } else !isSuspend
- if (_logger.isInfoEnabled())
+ if (_logger.isDebugEnabled())
{
if (!msgToRequeue.isEmpty())
{
- _logger.info("Preparing (" + msgToRequeue.size() + ") message to requeue to.");
+ _logger.debug("Preparing (" + msgToRequeue.size() + ") message to requeue to.");
}
}
@@ -1171,7 +1176,7 @@ public class AMQChannel
if (wasSuspended != suspended)
{
// Log Flow Started before we start the subscriptions
- if (!suspended)
+ if (!suspended && _logChannelFlowMessages)
{
getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW("Started"));
}
@@ -1225,7 +1230,7 @@ public class AMQChannel
// Log Suspension only after we have confirmed all suspensions are
// stopped.
- if (suspended)
+ if (suspended && _logChannelFlowMessages)
{
getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW("Stopped"));
}
@@ -1355,14 +1360,18 @@ public class AMQChannel
return _connection;
}
- public FlowCreditManager getCreditManager()
- {
- return _creditManager;
- }
-
public void setCredit(final long prefetchSize, final int prefetchCount)
{
- getVirtualHost().getEventLogger().message(ChannelMessages.PREFETCH_SIZE(prefetchSize, prefetchCount));
+ if (!_prefetchLoggedForChannel)
+ {
+ getVirtualHost().getEventLogger().message(ChannelMessages.PREFETCH_SIZE(prefetchSize, prefetchCount));
+ _prefetchLoggedForChannel = true;
+ }
+
+ if (prefetchCount <= 1 && prefetchSize == 0 )
+ {
+ _logChannelFlowMessages = false;
+ }
_creditManager.setCreditLimits(prefetchSize, prefetchCount);
}
@@ -2215,7 +2224,7 @@ public class AMQChannel
MessageSource queue = queueName == null ? getDefaultQueue() : vHost.getMessageSource(queueName.toString());
if (queue == null)
{
- _logger.info("No queue for '" + queueName + "'");
+ _logger.debug("No queue for '" + queueName + "'");
if (queueName != null)
{
_connection.closeConnection(AMQConstant.NOT_FOUND, "No such queue, '" + queueName + "'", _channelId);
@@ -3126,7 +3135,7 @@ public class AMQChannel
if (_logger.isInfoEnabled())
{
- _logger.info("Binding queue "
+ _logger.debug("Binding queue "
+ queue
+ " to exchange "
+ exch
@@ -3221,7 +3230,7 @@ public class AMQChannel
queue.getConsumerCount());
_connection.writeFrame(responseBody.generateFrame(getChannelId()));
- _logger.info("Queue " + queueName + " declared successfully");
+ _logger.debug("Queue " + queueName + " declared successfully");
}
}
}
@@ -3271,7 +3280,7 @@ public class AMQChannel
queue.getConsumerCount());
_connection.writeFrame(responseBody.generateFrame(getChannelId()));
- _logger.info("Queue " + queueName + " declared successfully");
+ _logger.debug("Queue " + queueName + " declared successfully");
}
}
catch (QueueExistsException qe)
@@ -3327,7 +3336,7 @@ public class AMQChannel
queue.getConsumerCount());
_connection.writeFrame(responseBody.generateFrame(getChannelId()));
- _logger.info("Queue " + queueName + " declared successfully");
+ _logger.debug("Queue " + queueName + " declared successfully");
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org