You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2007/01/25 22:00:24 UTC

svn commit: r499968 - in /incubator/qpid/branches/qpid.0-9/java: broker/src/main/java/org/apache/qpid/server/ broker/src/main/java/org/apache/qpid/server/protocol/ client/src/main/java/org/apache/qpid/client/ client/src/main/java/org/apache/qpid/client...

Author: kpvdr
Date: Thu Jan 25 13:00:22 2007
New Revision: 499968

URL: http://svn.apache.org/viewvc?view=rev&rev=499968
Log:
Added mechanism to track connection ids for logging and debugging purposes. Changed format of log/debug messages for RequestManager and ResponseManager.

Modified:
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
    incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
    incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
    incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
    incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java
    incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java
    incubator/qpid/branches/qpid.0-9/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=499968&r1=499967&r2=499968
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Thu Jan 25 13:00:22 2007
@@ -137,8 +137,8 @@
         _prefetch_LowWaterMark = _prefetch_HighWaterMark / 2;
         _messageStore = messageStore;
         _exchanges = exchanges;
-        _requestManager = new RequestManager(channelId, _session, true);
-        _responseManager = new ResponseManager(channelId, methodListener, _session, true);
+        _requestManager = new RequestManager(_session.getConnectionId(), channelId, _session, true);
+        _responseManager = new ResponseManager(_session.getConnectionId(), channelId, methodListener, _session, true);
         _txnBuffer = new TxnBuffer(_messageStore);
     }
 

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?view=diff&rev=499968&r1=499967&r2=499968
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java Thu Jan 25 13:00:22 2007
@@ -74,6 +74,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class AMQMinaProtocolSession implements AMQProtocolSession,
                                                ProtocolVersionList,
@@ -117,6 +118,10 @@
     private byte _minor;
     private FieldTable _clientProperties;
 
+    // Keeps a tally of connections for logging and debugging
+    private static AtomicInteger _ConnectionId;    
+    static { _ConnectionId = new AtomicInteger(0); }
+
     public ManagedObject getManagedObject()
     {
         return _managedObject;
@@ -127,6 +132,7 @@
                                   AMQCodecFactory codecFactory)
             throws AMQException
     {
+        _ConnectionId.incrementAndGet();
         _stateManager = new AMQStateManager(queueRegistry, exchangeRegistry, this);
         _minaProtocolSession = session;
         session.setAttachment(this);
@@ -144,6 +150,7 @@
                                   AMQCodecFactory codecFactory, AMQStateManager stateManager)
             throws AMQException
     {
+        _ConnectionId.incrementAndGet();
         _stateManager = stateManager;
         _minaProtocolSession = session;
         session.setAttachment(this);
@@ -699,5 +706,10 @@
         if (!versionEquals(methodBody.getMajor(), methodBody.getMinor())) {
             throw new RuntimeException("MethodBody version did not match version of current session.");
         }
+    }
+    
+    public int getConnectionId()
+    {
+        return _ConnectionId.get();
     }
 }

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java?view=diff&rev=499968&r1=499967&r2=499968
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java Thu Jan 25 13:00:22 2007
@@ -149,4 +149,5 @@
     byte getMinor();
     boolean versionEquals(byte major, byte minor);
     void checkMethodBodyVersion(AMQMethodBody methodBody);
+    int getConnectionId();
 }

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?view=diff&rev=499968&r1=499967&r2=499968
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Thu Jan 25 13:00:22 2007
@@ -78,7 +78,7 @@
 public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable
 {
     private static final Logger _logger = Logger.getLogger(AMQConnection.class);
-
+    
     private AtomicInteger _idFactory = new AtomicInteger(0);
 
     /**
@@ -155,6 +155,10 @@
     private AMQException _lastAMQException = null;
 
 
+    // Keeps a tally of connections for logging and debugging
+    private static AtomicInteger _ConnectionId;    
+    static { _ConnectionId = new AtomicInteger(0); }
+
     /*
      * The connection meta data
      */
@@ -200,7 +204,7 @@
     public AMQConnection(ConnectionURL connectionURL) throws AMQException
     {
         _logger.info("Connection:" + connectionURL);
-
+        _ConnectionId.incrementAndGet();
         if (connectionURL == null)
         {
             throw new IllegalArgumentException("Connection must be specified");
@@ -1019,5 +1023,10 @@
                 new StringRefAddr(AMQConnection.class.getName(), toURL()),
                 AMQConnectionFactory.class.getName(),
                 null);          // factory location
+    }
+    
+    public int getConnectionId()
+    {
+        return _ConnectionId.get();
     }
 }

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?view=diff&rev=499968&r1=499967&r2=499968
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Thu Jan 25 13:00:22 2007
@@ -576,4 +576,9 @@
     {
         _failoverState = failoverState;
     }
+    
+    public int getConnectionId()
+    {
+        return _connection.getConnectionId();
+    }
 }

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?view=diff&rev=499968&r1=499967&r2=499968
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Thu Jan 25 13:00:22 2007
@@ -106,6 +106,8 @@
      */
     protected int _queueId = 1;
     protected final Object _queueIdLock = new Object();
+    
+    protected int _ConnectionId;
 
     /**
      * No-arg constructor for use by test subclass - has to initialise final vars
@@ -118,8 +120,9 @@
         _stateManager = new AMQStateManager(this);
 
         // Add channel 0 request and response managers, since they will not be added through the usual mechanism
-        _channelId2RequestMgrMap.put(0, new RequestManager(0, this, false));
-        _channelId2ResponseMgrMap.put(0, new ResponseManager(0, _stateManager, this, false));
+        _ConnectionId = 0;
+        _channelId2RequestMgrMap.put(0, new RequestManager(_ConnectionId, 0, this, false));
+        _channelId2ResponseMgrMap.put(0, new ResponseManager(_ConnectionId, 0, _stateManager, this, false));
     }
 
     public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection)
@@ -131,8 +134,9 @@
         _stateManager = new AMQStateManager(this);
 
         // Add channel 0 request and response managers, since they will not be added through the usual mechanism
-        _channelId2RequestMgrMap.put(0, new RequestManager(0, this, false));
-        _channelId2ResponseMgrMap.put(0, new ResponseManager(0, _stateManager, this, false));
+        _ConnectionId = _protocolHandler == null ? 0 : _protocolHandler.getConnectionId();
+        _channelId2RequestMgrMap.put(0, new RequestManager(_ConnectionId, 0, this, false));
+        _channelId2ResponseMgrMap.put(0, new ResponseManager(_ConnectionId, 0, _stateManager, this, false));
     }
  
     public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection, AMQStateManager stateManager)
@@ -146,8 +150,9 @@
         _stateManager.setProtocolSession(this);
                 
         // Add channel 0 request and response managers, since they will not be added through the usual mechanism
-        _channelId2RequestMgrMap.put(0, new RequestManager(0, this, false));
-        _channelId2ResponseMgrMap.put(0, new ResponseManager(0, _stateManager, this, false));
+        _ConnectionId = _protocolHandler == null ? 0 : _protocolHandler.getConnectionId();
+        _channelId2RequestMgrMap.put(0, new RequestManager(_ConnectionId, 0, this, false));
+        _channelId2ResponseMgrMap.put(0, new ResponseManager(_ConnectionId, 0, _stateManager, this, false));
     }
 
     public void init()
@@ -379,12 +384,11 @@
         // Add request and response handlers, one per channel, if they do not already exist
         if (_channelId2RequestMgrMap.get(channelId) == null)
         {
-            _channelId2RequestMgrMap.put(channelId, new RequestManager(channelId, this, false));
+            _channelId2RequestMgrMap.put(channelId, new RequestManager(_ConnectionId, channelId, this, false));
         }
         if (_channelId2ResponseMgrMap.get(channelId) == null)
         {
-            
-            _channelId2ResponseMgrMap.put(channelId, new ResponseManager(channelId, _stateManager, this, false));
+            _channelId2ResponseMgrMap.put(channelId, new ResponseManager(_ConnectionId, channelId, _stateManager, this, false));
         }
     }
 

Modified: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java?view=diff&rev=499968&r1=499967&r2=499968
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java Thu Jan 25 13:00:22 2007
@@ -40,6 +40,7 @@
      * to be known.
      */
     private boolean serverFlag;
+    private int connectionId;
 
     /**
      * Request and response frames must have a requestID and responseID which
@@ -55,11 +56,12 @@
 
     private ConcurrentHashMap<Long, AMQMethodListener> requestSentMap;
 
-    public RequestManager(int channel, AMQProtocolWriter protocolWriter, boolean serverFlag)
+    public RequestManager(int connectionId, int channel, AMQProtocolWriter protocolWriter, boolean serverFlag)
     {
         this.channel = channel;
         this.protocolWriter = protocolWriter;
         this.serverFlag = serverFlag;
+        this.connectionId = connectionId;
         requestIdCount = 1L;
         lastProcessedResponseId = 0L;
         requestSentMap = new ConcurrentHashMap<Long, AMQMethodListener>();
@@ -77,11 +79,11 @@
         protocolWriter.writeFrame(requestFrame);
         if (logger.isDebugEnabled())
         {
-            logger.debug((serverFlag ? "SRV" : "CLI") + " TX REQ: ch=" + channel +
-                " Req[" + requestId + " " + lastProcessedResponseId + "]; " + requestMethodBody);
+            logger.debug((serverFlag ? "SRV[" : "CLI[") + connectionId + "," + channel +
+                "] TX REQ: Req[" + requestId + " " + lastProcessedResponseId + "]; " + requestMethodBody);
         }
-        //System.out.println((serverFlag ? "SRV" : "CLI") + " TX REQ: ch=" + channel +
-        //        " Req[" + requestId + " " + lastProcessedResponseId + "]; " + requestMethodBody);
+        //System.out.println((serverFlag ? "SRV[" : "CLI[") + connectionId + "," + channel +
+        //        "] TX REQ: Req[" + requestId + " " + lastProcessedResponseId + "]; " + requestMethodBody);
         return requestId;
     }
 
@@ -92,11 +94,11 @@
         long requestIdStop = requestIdStart + responseBody.getBatchOffset();
         if (logger.isDebugEnabled())
         {
-            logger.debug((serverFlag ? "SRV" : "CLI") + " RX RES: ch=" + channel +
-                " " + responseBody + "; " + responseBody.getMethodPayload());
+            logger.debug((serverFlag ? "SRV[" : "CLI[") + connectionId + "," + channel + "] RX RES: " +
+                responseBody + "; " + responseBody.getMethodPayload());
         }
-        //System.out.println((serverFlag ? "SRV" : "CLI") + " RX RES: ch=" + channel +
-        //        " " + responseBody + "; " + responseBody.getMethodPayload());
+        //System.out.println((serverFlag ? "SRV[" : "CLI[") + connectionId + "," + channel + "] RX RES: " +
+        //        responseBody + "; " + responseBody.getMethodPayload());
         for (long requestId = requestIdStart; requestId <= requestIdStop; requestId++)
         {
             AMQMethodListener methodListener = requestSentMap.get(requestId);

Modified: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java?view=diff&rev=499968&r1=499967&r2=499968
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java Thu Jan 25 13:00:22 2007
@@ -43,6 +43,7 @@
      * to be known.
      */
     private boolean serverFlag;
+    private int connectionId;
 
     private int maxAccumulatedResponses = 20; // Default
 //    private Class currentResponseMethodBodyClass;
@@ -83,13 +84,14 @@
 
     private ConcurrentHashMap<Long, ResponseStatus> responseMap;
 
-    public ResponseManager(int channel, AMQMethodListener methodListener,
+    public ResponseManager(int connectionId, int channel, AMQMethodListener methodListener,
         AMQProtocolWriter protocolWriter, boolean serverFlag)
     {
         this.channel = channel;
         this.methodListener = methodListener;
         this.protocolWriter = protocolWriter;
         this.serverFlag = serverFlag;
+        this.connectionId = connectionId;
         responseIdCount = 1L;
         lastReceivedRequestId = 0L;
 //        currentResponseMethodBodyClass = null;
@@ -103,11 +105,11 @@
         long requestId = requestBody.getRequestId();
         if (logger.isDebugEnabled())
         {
-            logger.debug((serverFlag ? "SRV" : "CLI") + " RX REQ: ch=" + channel +
-                " " + requestBody + "; " + requestBody.getMethodPayload());
+            logger.debug((serverFlag ? "SRV[" : "CLI[") + connectionId + "," + channel + "] RX REQ: " + 
+                requestBody + "; " + requestBody.getMethodPayload());
         }
-        //System.out.println((serverFlag ? "SRV" : "CLI") + " RX REQ: ch=" + channel + 
-        //        " " + requestBody + "; " + requestBody.getMethodPayload());
+        //System.out.println((serverFlag ? "SRV[" : "CLI[") + connectionId + "," + channel + "] RX REQ: " + 
+        //        requestBody + "; " + requestBody.getMethodPayload());
         // TODO: responseMark is used in HA, but until then, ignore...
         long responseMark = requestBody.getResponseMark();
         lastReceivedRequestId = requestId;
@@ -122,11 +124,11 @@
     {
         if (logger.isDebugEnabled())
         {
-            logger.debug((serverFlag ? "SRV" : "CLI") + " TX RES: ch=" + channel +
-                " Res[# " + requestId + "]; " + responseMethodBody);
+            logger.debug((serverFlag ? "SRV[" : "CLI[") + connectionId + "," + channel +
+                "] TX RES: Res[# " + requestId + "]; " + responseMethodBody);
         }
-        //System.out.println((serverFlag ? "SRV" : "CLI") + " TX RES: ch=" + channel +
-        //        " Res[# " + requestId + "]; " + responseMethodBody);
+        //System.out.println((serverFlag ? "SRV[" : "CLI[") + connectionId + "," + channel +
+        //        "] TX RES: Res[# " + requestId + "]; " + responseMethodBody);
         ResponseStatus responseStatus = responseMap.get(requestId);
         if (responseStatus == null)
             throw new RequestResponseMappingException(requestId,

Modified: incubator/qpid/branches/qpid.0-9/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java?view=diff&rev=499968&r1=499967&r2=499968
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java Thu Jan 25 13:00:22 2007
@@ -35,6 +35,7 @@
 import javax.security.sasl.SaslServer;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * A protocol session that can be used for testing purposes.
@@ -45,8 +46,13 @@
 
     private Map<Integer, AMQChannel> _channelMap = new HashMap<Integer, AMQChannel>();
 
+    // Keeps a tally of connections for logging and debugging
+    private static AtomicInteger _ConnectionId;    
+    static { _ConnectionId = new AtomicInteger(0); }
+
     public MockProtocolSession(MessageStore messageStore)
     {
+        _ConnectionId.incrementAndGet();
         _messageStore = messageStore;
     }
 
@@ -221,4 +227,9 @@
 		// TODO Auto-generated method stub
 		
 	}
+    
+    public int getConnectionId()
+    {
+        return _ConnectionId.get();
+    }
 }