You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2015/07/28 18:44:39 UTC

svn commit: r1693130 - in /qpid/java/trunk: ./ client/src/main/java/org/apache/qpid/client/ client/src/main/java/org/apache/qpid/client/protocol/ systests/src/main/java/org/apache/qpid/test/utils/

Author: kwall
Date: Tue Jul 28 16:44:38 2015
New Revision: 1693130

URL: http://svn.apache.org/r1693130
Log:
QPID-6663: [Java Client] Stop using system property amqj.protocol.logging.level to control clients logging [0-8..0-91]

Modified:
    qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
    qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
    qpid/java/trunk/pom.xml
    qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java

Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=1693130&r1=1693129&r2=1693130&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Tue Jul 28 16:44:38 2015
@@ -21,7 +21,6 @@
 package org.apache.qpid.client;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -133,7 +132,6 @@ public abstract class BasicMessageConsum
     private final boolean _autoClose;
 
     private final boolean _browseOnly;
-    private List<StackTraceElement> _closedStack = null;
 
     private boolean _isDurableSubscriber = false;
     private int _addressType = AMQDestination.UNKNOWN_TYPE;
@@ -585,18 +583,6 @@ public abstract class BasicMessageConsum
         if (!setClosed())
         {
             setClosing(true);
-            if (_logger.isDebugEnabled())
-            {
-                StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
-                if (_closedStack != null)
-                {
-                    _logger.debug(_consumerTag + " previously:" + _closedStack.toString());
-                }
-                else
-                {
-                    _closedStack = Arrays.asList(stackTrace).subList(3, stackTrace.length - 1);
-                }
-            }
 
             if (sendClose)
             {
@@ -673,27 +659,7 @@ public abstract class BasicMessageConsum
      */
     void markClosed()
     {
-        // synchronized (_closed)
-        {
-            setClosed();
-
-            if (_logger.isDebugEnabled())
-            {
-                if (_closedStack != null)
-                {
-                    StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
-                    _logger.debug(_consumerTag + " markClosed():"
-                                  + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1));
-                    _logger.debug(_consumerTag + " previously:" + _closedStack.toString());
-                }
-                else
-                {
-                	StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
-                    _closedStack = Arrays.asList(stackTrace).subList(3, stackTrace.length - 1);
-                }
-            }
-        }
-
+        setClosed();
         deregisterConsumer();
     }
 
@@ -850,24 +816,7 @@ public abstract class BasicMessageConsum
 
     void notifyError(Throwable cause)
     {
-        // synchronized (_closed)
-        {
-            setClosed();
-            if (_logger.isDebugEnabled())
-            {
-                StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
-                if (_closedStack != null)
-                {
-                    _logger.debug(_consumerTag + " notifyError():"
-                                  + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1));
-                    _logger.debug(_consumerTag + " previously" + _closedStack.toString());
-                }
-                else
-                {
-                    _closedStack = Arrays.asList(stackTrace).subList(3, stackTrace.length - 1);
-                }
-            }
-        }
+        setClosed();
         // QPID-293 can "request redelivery of this error through dispatcher"
 
         // we have no way of propagating the exception to a message listener - a JMS limitation - so we

Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=1693130&r1=1693129&r2=1693130&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Tue Jul 28 16:44:38 2015
@@ -74,57 +74,10 @@ import org.apache.qpid.transport.network
 import org.apache.qpid.transport.network.TransportActivity;
 import org.apache.qpid.util.BytesDataOutput;
 
-/**
- * AMQProtocolHandler is the client side protocol handler for AMQP, it handles all protocol events received from the
- * network by MINA. The primary purpose of AMQProtocolHandler is to translate the generic event model of MINA into the
- * specific event model of AMQP, by revealing the type of the received events (from decoded data), and passing the
- * event on to more specific handlers for the type. In this sense, it channels the richer event model of AMQP,
- * expressed in terms of methods and so on, through the cruder, general purpose event model of MINA, expressed in
- * terms of "message received" and so on.
- * <p>
- * There is a 1:1 mapping between an AMQProtocolHandler and an {@link AMQConnection}. The connection class is
- * exposed to the end user of the AMQP client API, and also implements the JMS Connection API, so provides the public
- * API calls through which an individual connection can be manipulated. This protocol handler talks to the network
- * through MINA, in a behind the scenes role; it is not an exposed part of the client API.
- * <p>
- * There is a 1:many mapping between an AMQProtocolHandler and a set of {@link AMQSession}s. At the MINA level,
- * there is one session per connection. At the AMQP level there can be many channels which are also called sessions in
- * JMS parlance. The {@link AMQSession}s are managed through an {@link AMQProtocolSession} instance. The protocol
- * session is similar to the MINA per-connection session, except that it can span the lifecycle of multiple MINA sessions
- * in the event of failover. See below for more information about this.
- * <p>
- * Mina provides a session container that can be used to store/retrieve arbitrary objects as String named
- * attributes. A more convenient, type-safe, container for session data is provided in the form of
- * {@link AMQProtocolSession}.
- *
- * <p>
- * A common way to use MINA is to have a single instance of the event handler, and for MINA to pass in its session
- * object with every event, and for per-connection data to be held in the MINA session (perhaps using a type-safe wrapper
- * as described above). This event handler is different, because dealing with failover complicates things. To the
- * end client of an AMQConnection, a failed over connection is still handled through the same connection instance, but
- * behind the scenes a new transport connection, and MINA session will have been created. The MINA session object cannot
- * be used to track the state of the fail-over process, because it is destroyed and a new one is created, as the old
- * connection is shutdown and a new one created. For this reason, an AMQProtocolHandler is created per AMQConnection
- * and the protocol session data is held outside of the MINA IOSession.
- * <p>
- * This handler is responsible for setting up the filter chain to filter all events for this handler through.
- * The filter chain is set up as a stack of event handers that perform the following functions (working upwards from
- * the network traffic at the bottom), handing off incoming events to an asynchronous thread pool to do the work,
- * optionally handling secure sockets encoding/decoding, encoding/decoding the AMQP format itself.
- * <p>
- * TODO  Use a single handler instance, by shifting everything to do with the 'protocol session' state, including
- * failover state, into AMQProtocolSession, and tracking that from AMQConnection? The lifecycles of
- * AMQProtocolSesssion and AMQConnection will be the same, so if there is high cohesion between them, they could
- * be merged, although there is sense in keeping the session model separate. Will clarify things by having data
- * held per protocol handler, per protocol session, per network connection, per channel, in separate classes, so
- * that lifecycles of the fields match lifecycles of their containing objects.
- */
 public class AMQProtocolHandler implements ExceptionHandlingByteBufferReceiver, TransportActivity
 {
     /** Used for debugging. */
     private static final Logger _logger = LoggerFactory.getLogger(AMQProtocolHandler.class);
-    private static final Logger _protocolLogger = LoggerFactory.getLogger("qpid.protocol");
-    private static final boolean PROTOCOL_DEBUG = (System.getProperty("amqj.protocol.logging.level") != null);
 
     private static final long MAXIMUM_STATE_WAIT_TIME = Long.parseLong(System.getProperty("amqj.MaximumStateWait", "30000"));
     private static final String AMQJ_DEFAULT_SYNCWRITE_TIMEOUT = "amqj.default_syncwrite_timeout";
@@ -133,10 +86,10 @@ public class AMQProtocolHandler implemen
      * The connection that this protocol handler is associated with. There is a 1-1 mapping between connection
      * instances and protocol handler instances.
      */
-    private AMQConnection _connection;
+    private final AMQConnection _connection;
 
     /** Our wrapper for a protocol session that provides access to session values in a typesafe manner. */
-    private volatile AMQProtocolSession _protocolSession;
+    private final AMQProtocolSession _protocolSession;
 
     /** Holds the state of the protocol session. */
     private AMQStateManager _stateManager;
@@ -149,7 +102,7 @@ public class AMQProtocolHandler implemen
      * to be able to send errors during failover back to the client application. The session won't be available in the
      * case where we failing over due to a Connection.Redirect message from the broker.
      */
-    private FailoverHandler _failoverHandler;
+    private final FailoverHandler _failoverHandler;
 
     /**
      * This flag is used to track whether failover is being attempted. It is used to prevent the application constantly
@@ -188,6 +141,11 @@ public class AMQProtocolHandler implemen
     private HeartbeatListener _heartbeatListener = HeartbeatListener.DEFAULT;
     private Throwable _initialConnectionException;
 
+    private static final int REUSABLE_BYTE_BUFFER_CAPACITY = 65 * 1024;
+    private final byte[] _reusableBytes = new byte[REUSABLE_BYTE_BUFFER_CAPACITY];
+    private final ByteBuffer _reusableByteBuffer = ByteBuffer.wrap(_reusableBytes);
+    private final BytesDataOutput _reusableDataOutput = new BytesDataOutput(_reusableBytes);
+
     /**
      * Creates a new protocol handler, associated with the specified client connection instance.
      *
@@ -473,42 +431,39 @@ public class AMQProtocolHandler implemen
             for (int i = 0; i < size; i++)
             {
                 AMQDataBlock message = dataBlocks.get(i);
-                    if (PROTOCOL_DEBUG)
-                    {
-                        _protocolLogger.info(String.format("RECV: [%s] %s", this, message));
-                    }
+                _logger.debug("RECV: {}", message);
 
-                    if(message instanceof AMQFrame)
-                    {
+                if(message instanceof AMQFrame)
+                {
 
-                        final long msgNumber = ++_messageReceivedCount;
+                    final long msgNumber = ++_messageReceivedCount;
 
-                        if (((msgNumber % 1000) == 0) && _logger.isDebugEnabled())
-                        {
-                            _logger.debug("Received " + _messageReceivedCount + " protocol messages");
-                        }
+                    if (((msgNumber % 1000) == 0) && _logger.isDebugEnabled())
+                    {
+                        _logger.debug("Received {} protocol messages", _messageReceivedCount);
+                    }
 
-                        AMQFrame frame = (AMQFrame) message;
+                    AMQFrame frame = (AMQFrame) message;
 
-                        final AMQBody bodyFrame = frame.getBodyFrame();
+                    final AMQBody bodyFrame = frame.getBodyFrame();
 
-                        bodyFrame.handle(frame.getChannel(), _protocolSession);
+                    bodyFrame.handle(frame.getChannel(), _protocolSession);
 
-                        _connection.bytesReceived(_readBytes);
-                    }
-                    else if (message instanceof ProtocolInitiation)
-                    {
-                        // We get here if the server sends a response to our initial protocol header
-                        // suggesting an alternate ProtocolVersion; the server will then close the
-                        // connection.
-                        ProtocolInitiation protocolInit = (ProtocolInitiation) message;
-                        _suggestedProtocolVersion = protocolInit.checkVersion();
-                        _logger.info("Broker suggested using protocol version:" + _suggestedProtocolVersion);
+                    _connection.bytesReceived(_readBytes);
+                }
+                else if (message instanceof ProtocolInitiation)
+                {
+                    // We get here if the server sends a response to our initial protocol header
+                    // suggesting an alternate ProtocolVersion; the server will then close the
+                    // connection.
+                    ProtocolInitiation protocolInit = (ProtocolInitiation) message;
+                    _suggestedProtocolVersion = protocolInit.checkVersion();
+                    _logger.info("Broker suggested using protocol version: {} ", _suggestedProtocolVersion);
 
-                        // get round a bug in old versions of qpid whereby the connection is not closed
-                        _stateManager.changeState(AMQState.CONNECTION_CLOSED);
-                    }
+                    // get round a bug in old versions of qpid whereby the connection is not closed
+                    _stateManager.changeState(AMQState.CONNECTION_CLOSED);
                 }
+            }
         }
         catch (Exception e)
         {
@@ -527,12 +482,6 @@ public class AMQProtocolHandler implemen
     public void methodBodyReceived(final int channelId, final AMQBody bodyFrame)
             throws QpidException
     {
-
-        if (_logger.isDebugEnabled())
-        {
-            _logger.debug("(" + System.identityHashCode(this) + ")Method frame received: " + bodyFrame);
-        }
-
         final AMQMethodEvent<AMQMethodBody> evt =
                 new AMQMethodEvent<AMQMethodBody>(channelId, (AMQMethodBody) bodyFrame);
 
@@ -590,10 +539,7 @@ public class AMQProtocolHandler implemen
             _sender.flush();
         }
 
-        if (PROTOCOL_DEBUG)
-        {
-            _protocolLogger.debug(String.format("SEND: [%s] %s", this, frame));
-        }
+        _logger.debug("SEND: {}", frame);
 
         final long sentMessages = _messagesOut++;
 
@@ -601,18 +547,13 @@ public class AMQProtocolHandler implemen
 
         if (debug && ((sentMessages % 1000) == 0))
         {
-            _logger.debug("Sent " + _messagesOut + " protocol messages");
+            _logger.debug("Sent {} protocol messages", _messagesOut);
         }
 
         _connection.bytesSent(_writtenBytes);
 
     }
 
-    private static final int REUSABLE_BYTE_BUFFER_CAPACITY = 65 * 1024;
-    private final byte[] _reusableBytes = new byte[REUSABLE_BYTE_BUFFER_CAPACITY];
-    private final ByteBuffer _reusableByteBuffer = ByteBuffer.wrap(_reusableBytes);
-    private final BytesDataOutput _reusableDataOutput = new BytesDataOutput(_reusableBytes);
-
     private ByteBuffer asByteBuffer(AMQDataBlock block)
     {
         final int size = (int) block.getSize();

Modified: qpid/java/trunk/pom.xml
URL: http://svn.apache.org/viewvc/qpid/java/trunk/pom.xml?rev=1693130&r1=1693129&r2=1693130&view=diff
==============================================================================
--- qpid/java/trunk/pom.xml (original)
+++ qpid/java/trunk/pom.xml Tue Jul 28 16:44:38 2015
@@ -220,7 +220,6 @@
             <broker.config>${qpid.home}${file.separator}etc${file.separator}config-systests.json</broker.config>
             <max_prefetch>1000</max_prefetch>
             <qpid.dest_syntax>BURL</qpid.dest_syntax>
-            <amqj.protocol.logging.level>debug</amqj.protocol.logging.level>
             <test.port>15672</test.port>
             <test.mport>18999</test.mport>
             <test.cport>19099</test.cport>

Modified: qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java?rev=1693130&r1=1693129&r2=1693130&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java (original)
+++ qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java Tue Jul 28 16:44:38 2015
@@ -1289,8 +1289,6 @@ public class QpidBrokerTestCase extends
         {
             jvmOptions.putAll(_propertiesSetForBroker);
 
-            copySystemProperty("amqj.protocol.logging.level", jvmOptions);
-
             copySystemProperty("test.port", jvmOptions);
             copySystemProperty("test.mport", jvmOptions);
             copySystemProperty("test.cport", jvmOptions);



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org