You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2015/07/28 18:44:39 UTC
svn commit: r1693130 - in /qpid/java/trunk: ./
client/src/main/java/org/apache/qpid/client/
client/src/main/java/org/apache/qpid/client/protocol/
systests/src/main/java/org/apache/qpid/test/utils/
Author: kwall
Date: Tue Jul 28 16:44:38 2015
New Revision: 1693130
URL: http://svn.apache.org/r1693130
Log:
QPID-6663: [Java Client] Stop using system property amqj.protocol.logging.level to control clients logging [0-8..0-91]
Modified:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
qpid/java/trunk/pom.xml
qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=1693130&r1=1693129&r2=1693130&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Tue Jul 28 16:44:38 2015
@@ -21,7 +21,6 @@
package org.apache.qpid.client;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -133,7 +132,6 @@ public abstract class BasicMessageConsum
private final boolean _autoClose;
private final boolean _browseOnly;
- private List<StackTraceElement> _closedStack = null;
private boolean _isDurableSubscriber = false;
private int _addressType = AMQDestination.UNKNOWN_TYPE;
@@ -585,18 +583,6 @@ public abstract class BasicMessageConsum
if (!setClosed())
{
setClosing(true);
- if (_logger.isDebugEnabled())
- {
- StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
- if (_closedStack != null)
- {
- _logger.debug(_consumerTag + " previously:" + _closedStack.toString());
- }
- else
- {
- _closedStack = Arrays.asList(stackTrace).subList(3, stackTrace.length - 1);
- }
- }
if (sendClose)
{
@@ -673,27 +659,7 @@ public abstract class BasicMessageConsum
*/
void markClosed()
{
- // synchronized (_closed)
- {
- setClosed();
-
- if (_logger.isDebugEnabled())
- {
- if (_closedStack != null)
- {
- StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
- _logger.debug(_consumerTag + " markClosed():"
- + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1));
- _logger.debug(_consumerTag + " previously:" + _closedStack.toString());
- }
- else
- {
- StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
- _closedStack = Arrays.asList(stackTrace).subList(3, stackTrace.length - 1);
- }
- }
- }
-
+ setClosed();
deregisterConsumer();
}
@@ -850,24 +816,7 @@ public abstract class BasicMessageConsum
void notifyError(Throwable cause)
{
- // synchronized (_closed)
- {
- setClosed();
- if (_logger.isDebugEnabled())
- {
- StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
- if (_closedStack != null)
- {
- _logger.debug(_consumerTag + " notifyError():"
- + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1));
- _logger.debug(_consumerTag + " previously" + _closedStack.toString());
- }
- else
- {
- _closedStack = Arrays.asList(stackTrace).subList(3, stackTrace.length - 1);
- }
- }
- }
+ setClosed();
// QPID-293 can "request redelivery of this error through dispatcher"
// we have no way of propagating the exception to a message listener - a JMS limitation - so we
Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=1693130&r1=1693129&r2=1693130&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Tue Jul 28 16:44:38 2015
@@ -74,57 +74,10 @@ import org.apache.qpid.transport.network
import org.apache.qpid.transport.network.TransportActivity;
import org.apache.qpid.util.BytesDataOutput;
-/**
- * AMQProtocolHandler is the client side protocol handler for AMQP, it handles all protocol events received from the
- * network by MINA. The primary purpose of AMQProtocolHandler is to translate the generic event model of MINA into the
- * specific event model of AMQP, by revealing the type of the received events (from decoded data), and passing the
- * event on to more specific handlers for the type. In this sense, it channels the richer event model of AMQP,
- * expressed in terms of methods and so on, through the cruder, general purpose event model of MINA, expressed in
- * terms of "message received" and so on.
- * <p>
- * There is a 1:1 mapping between an AMQProtocolHandler and an {@link AMQConnection}. The connection class is
- * exposed to the end user of the AMQP client API, and also implements the JMS Connection API, so provides the public
- * API calls through which an individual connection can be manipulated. This protocol handler talks to the network
- * through MINA, in a behind the scenes role; it is not an exposed part of the client API.
- * <p>
- * There is a 1:many mapping between an AMQProtocolHandler and a set of {@link AMQSession}s. At the MINA level,
- * there is one session per connection. At the AMQP level there can be many channels which are also called sessions in
- * JMS parlance. The {@link AMQSession}s are managed through an {@link AMQProtocolSession} instance. The protocol
- * session is similar to the MINA per-connection session, except that it can span the lifecycle of multiple MINA sessions
- * in the event of failover. See below for more information about this.
- * <p>
- * Mina provides a session container that can be used to store/retrieve arbitrary objects as String named
- * attributes. A more convenient, type-safe, container for session data is provided in the form of
- * {@link AMQProtocolSession}.
- *
- * <p>
- * A common way to use MINA is to have a single instance of the event handler, and for MINA to pass in its session
- * object with every event, and for per-connection data to be held in the MINA session (perhaps using a type-safe wrapper
- * as described above). This event handler is different, because dealing with failover complicates things. To the
- * end client of an AMQConnection, a failed over connection is still handled through the same connection instance, but
- * behind the scenes a new transport connection, and MINA session will have been created. The MINA session object cannot
- * be used to track the state of the fail-over process, because it is destroyed and a new one is created, as the old
- * connection is shutdown and a new one created. For this reason, an AMQProtocolHandler is created per AMQConnection
- * and the protocol session data is held outside of the MINA IOSession.
- * <p>
- * This handler is responsible for setting up the filter chain to filter all events for this handler through.
- * The filter chain is set up as a stack of event handers that perform the following functions (working upwards from
- * the network traffic at the bottom), handing off incoming events to an asynchronous thread pool to do the work,
- * optionally handling secure sockets encoding/decoding, encoding/decoding the AMQP format itself.
- * <p>
- * TODO Use a single handler instance, by shifting everything to do with the 'protocol session' state, including
- * failover state, into AMQProtocolSession, and tracking that from AMQConnection? The lifecycles of
- * AMQProtocolSesssion and AMQConnection will be the same, so if there is high cohesion between them, they could
- * be merged, although there is sense in keeping the session model separate. Will clarify things by having data
- * held per protocol handler, per protocol session, per network connection, per channel, in separate classes, so
- * that lifecycles of the fields match lifecycles of their containing objects.
- */
public class AMQProtocolHandler implements ExceptionHandlingByteBufferReceiver, TransportActivity
{
/** Used for debugging. */
private static final Logger _logger = LoggerFactory.getLogger(AMQProtocolHandler.class);
- private static final Logger _protocolLogger = LoggerFactory.getLogger("qpid.protocol");
- private static final boolean PROTOCOL_DEBUG = (System.getProperty("amqj.protocol.logging.level") != null);
private static final long MAXIMUM_STATE_WAIT_TIME = Long.parseLong(System.getProperty("amqj.MaximumStateWait", "30000"));
private static final String AMQJ_DEFAULT_SYNCWRITE_TIMEOUT = "amqj.default_syncwrite_timeout";
@@ -133,10 +86,10 @@ public class AMQProtocolHandler implemen
* The connection that this protocol handler is associated with. There is a 1-1 mapping between connection
* instances and protocol handler instances.
*/
- private AMQConnection _connection;
+ private final AMQConnection _connection;
/** Our wrapper for a protocol session that provides access to session values in a typesafe manner. */
- private volatile AMQProtocolSession _protocolSession;
+ private final AMQProtocolSession _protocolSession;
/** Holds the state of the protocol session. */
private AMQStateManager _stateManager;
@@ -149,7 +102,7 @@ public class AMQProtocolHandler implemen
* to be able to send errors during failover back to the client application. The session won't be available in the
* case where we failing over due to a Connection.Redirect message from the broker.
*/
- private FailoverHandler _failoverHandler;
+ private final FailoverHandler _failoverHandler;
/**
* This flag is used to track whether failover is being attempted. It is used to prevent the application constantly
@@ -188,6 +141,11 @@ public class AMQProtocolHandler implemen
private HeartbeatListener _heartbeatListener = HeartbeatListener.DEFAULT;
private Throwable _initialConnectionException;
+ private static final int REUSABLE_BYTE_BUFFER_CAPACITY = 65 * 1024;
+ private final byte[] _reusableBytes = new byte[REUSABLE_BYTE_BUFFER_CAPACITY];
+ private final ByteBuffer _reusableByteBuffer = ByteBuffer.wrap(_reusableBytes);
+ private final BytesDataOutput _reusableDataOutput = new BytesDataOutput(_reusableBytes);
+
/**
* Creates a new protocol handler, associated with the specified client connection instance.
*
@@ -473,42 +431,39 @@ public class AMQProtocolHandler implemen
for (int i = 0; i < size; i++)
{
AMQDataBlock message = dataBlocks.get(i);
- if (PROTOCOL_DEBUG)
- {
- _protocolLogger.info(String.format("RECV: [%s] %s", this, message));
- }
+ _logger.debug("RECV: {}", message);
- if(message instanceof AMQFrame)
- {
+ if(message instanceof AMQFrame)
+ {
- final long msgNumber = ++_messageReceivedCount;
+ final long msgNumber = ++_messageReceivedCount;
- if (((msgNumber % 1000) == 0) && _logger.isDebugEnabled())
- {
- _logger.debug("Received " + _messageReceivedCount + " protocol messages");
- }
+ if (((msgNumber % 1000) == 0) && _logger.isDebugEnabled())
+ {
+ _logger.debug("Received {} protocol messages", _messageReceivedCount);
+ }
- AMQFrame frame = (AMQFrame) message;
+ AMQFrame frame = (AMQFrame) message;
- final AMQBody bodyFrame = frame.getBodyFrame();
+ final AMQBody bodyFrame = frame.getBodyFrame();
- bodyFrame.handle(frame.getChannel(), _protocolSession);
+ bodyFrame.handle(frame.getChannel(), _protocolSession);
- _connection.bytesReceived(_readBytes);
- }
- else if (message instanceof ProtocolInitiation)
- {
- // We get here if the server sends a response to our initial protocol header
- // suggesting an alternate ProtocolVersion; the server will then close the
- // connection.
- ProtocolInitiation protocolInit = (ProtocolInitiation) message;
- _suggestedProtocolVersion = protocolInit.checkVersion();
- _logger.info("Broker suggested using protocol version:" + _suggestedProtocolVersion);
+ _connection.bytesReceived(_readBytes);
+ }
+ else if (message instanceof ProtocolInitiation)
+ {
+ // We get here if the server sends a response to our initial protocol header
+ // suggesting an alternate ProtocolVersion; the server will then close the
+ // connection.
+ ProtocolInitiation protocolInit = (ProtocolInitiation) message;
+ _suggestedProtocolVersion = protocolInit.checkVersion();
+ _logger.info("Broker suggested using protocol version: {} ", _suggestedProtocolVersion);
- // get round a bug in old versions of qpid whereby the connection is not closed
- _stateManager.changeState(AMQState.CONNECTION_CLOSED);
- }
+ // get round a bug in old versions of qpid whereby the connection is not closed
+ _stateManager.changeState(AMQState.CONNECTION_CLOSED);
}
+ }
}
catch (Exception e)
{
@@ -527,12 +482,6 @@ public class AMQProtocolHandler implemen
public void methodBodyReceived(final int channelId, final AMQBody bodyFrame)
throws QpidException
{
-
- if (_logger.isDebugEnabled())
- {
- _logger.debug("(" + System.identityHashCode(this) + ")Method frame received: " + bodyFrame);
- }
-
final AMQMethodEvent<AMQMethodBody> evt =
new AMQMethodEvent<AMQMethodBody>(channelId, (AMQMethodBody) bodyFrame);
@@ -590,10 +539,7 @@ public class AMQProtocolHandler implemen
_sender.flush();
}
- if (PROTOCOL_DEBUG)
- {
- _protocolLogger.debug(String.format("SEND: [%s] %s", this, frame));
- }
+ _logger.debug("SEND: {}", frame);
final long sentMessages = _messagesOut++;
@@ -601,18 +547,13 @@ public class AMQProtocolHandler implemen
if (debug && ((sentMessages % 1000) == 0))
{
- _logger.debug("Sent " + _messagesOut + " protocol messages");
+ _logger.debug("Sent {} protocol messages", _messagesOut);
}
_connection.bytesSent(_writtenBytes);
}
- private static final int REUSABLE_BYTE_BUFFER_CAPACITY = 65 * 1024;
- private final byte[] _reusableBytes = new byte[REUSABLE_BYTE_BUFFER_CAPACITY];
- private final ByteBuffer _reusableByteBuffer = ByteBuffer.wrap(_reusableBytes);
- private final BytesDataOutput _reusableDataOutput = new BytesDataOutput(_reusableBytes);
-
private ByteBuffer asByteBuffer(AMQDataBlock block)
{
final int size = (int) block.getSize();
Modified: qpid/java/trunk/pom.xml
URL: http://svn.apache.org/viewvc/qpid/java/trunk/pom.xml?rev=1693130&r1=1693129&r2=1693130&view=diff
==============================================================================
--- qpid/java/trunk/pom.xml (original)
+++ qpid/java/trunk/pom.xml Tue Jul 28 16:44:38 2015
@@ -220,7 +220,6 @@
<broker.config>${qpid.home}${file.separator}etc${file.separator}config-systests.json</broker.config>
<max_prefetch>1000</max_prefetch>
<qpid.dest_syntax>BURL</qpid.dest_syntax>
- <amqj.protocol.logging.level>debug</amqj.protocol.logging.level>
<test.port>15672</test.port>
<test.mport>18999</test.mport>
<test.cport>19099</test.cport>
Modified: qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java?rev=1693130&r1=1693129&r2=1693130&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java (original)
+++ qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java Tue Jul 28 16:44:38 2015
@@ -1289,8 +1289,6 @@ public class QpidBrokerTestCase extends
{
jvmOptions.putAll(_propertiesSetForBroker);
- copySystemProperty("amqj.protocol.logging.level", jvmOptions);
-
copySystemProperty("test.port", jvmOptions);
copySystemProperty("test.mport", jvmOptions);
copySystemProperty("test.cport", jvmOptions);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org